In [30]:
from cassandra.cluster import Cluster
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, min, max, date_format, to_date
from subprocess import check_output
from cassandra import ConsistencyLevel
import grpc
import station_pb2
import station_pb2_grpc

In [31]:
!nodetool status

Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address     Load       Tokens  Owns (effective)  Host ID                               Rack 
UN  172.21.0.4  86.02 KiB  16      100.0%            e8371820-8545-4295-a6ee-246e887e4985  rack1
UN  172.21.0.2  86.01 KiB  16      100.0%            b10f2eec-791a-4d47-9d37-84dec8be4cc0  rack1
UN  172.21.0.3  86.01 KiB  16      100.0%            8ff7e1fd-e673-40d7-a20d-491a81840f9a  rack1



In [32]:
cluster = Cluster(['p6-db-1', 'p6-db-2', 'p6-db-3'])
cass = cluster.connect()

In [33]:
cass.execute("drop keyspace if exists weather")

<cassandra.cluster.ResultSet at 0x7f93abf109d0>

In [34]:
cass.execute("create keyspace weather with replication={'class': 'SimpleStrategy', 'replication_factor': 3}")

<cassandra.cluster.ResultSet at 0x7f93abf10f10>

In [35]:
cass.execute("""
create type weather.station_record (
    tmin INT,
    tmax INT
)
""")

<cassandra.cluster.ResultSet at 0x7f93c45b0cd0>

In [36]:
cass.execute("""
CREATE TABLE weather.stations (
    id TEXT,
    name TEXT STATIC,
    date DATE,
    record weather.station_record,
    PRIMARY KEY ((id), date)
) WITH CLUSTERING ORDER BY (date ASC)
""")

<cassandra.cluster.ResultSet at 0x7f93abf112d0>

In [37]:
#q1: What is the Schema of stations?
cass.execute("describe table weather.stations").one().create_statement

"CREATE TABLE weather.stations (\n    id text,\n    date date,\n    name text static,\n    record station_record,\n    PRIMARY KEY (id, date)\n) WITH CLUSTERING ORDER BY (date ASC)\n    AND additional_write_policy = '99p'\n    AND bloom_filter_fp_chance = 0.01\n    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}\n    AND cdc = false\n    AND comment = ''\n    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}\n    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}\n    AND memtable = 'default'\n    AND crc_check_chance = 1.0\n    AND default_time_to_live = 0\n    AND extensions = {}\n    AND gc_grace_seconds = 864000\n    AND max_index_interval = 2048\n    AND memtable_flush_period_in_ms = 0\n    AND min_index_interval = 128\n    AND read_repair = 'BLOCKING'\n    AND speculative_retry = '99p';"

In [38]:
spark = (SparkSession.builder
         .appName("p6")
         .config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.0')
         .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
         .getOrCreate())

In [39]:
df = spark.read.text("ghcnd-stations.txt")

In [40]:
df2 = df.withColumn("ID", expr("substring(value, 1, 11)"))
df = df2.withColumn("STATE", expr("substring(value, 39, 2)"))
df2 = df.withColumn("NAME", expr("substring(value, 42, 30)"))
#df2.limit(20).toPandas()
df2.createOrReplaceTempView("stations")

In [41]:
row_list = df2.rdd.filter(lambda row: row["STATE"] == "WI").collect()
df2.printSchema()



root
 |-- value: string (nullable = true)
 |-- ID: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- NAME: string (nullable = true)



                                                                                

In [42]:
insert_wi = cass.prepare("""
INSERT INTO weather.stations (id, name)
VALUES (?, ?)
""")

In [43]:
print(len(row_list))
for row in row_list:
    # do cassandra with row["ID"] and row["NAME"]
    # need to reuse query many times
    cass.execute(insert_wi, (row["ID"], row["NAME"]))

1313


In [44]:
cass.execute("SELECT COUNT(*) FROM weather.stations").one()



Row(count=1313)

In [45]:
#q2: what is the name corresponding to station ID USW00014837?
cass.execute("select NAME from weather.stations where ID = 'USW00014837'").one()[0].strip()

'MADISON DANE CO RGNL AP'

In [46]:
#q3: what is the token for the USC00470273 station?
cass.execute("select token(ID) from weather.stations where ID = 'USC00470273'").one()[0]

-9014250178872933741

In [47]:
# # example code for server.py insert
# station = "testID"
# date = "2023-11-22"
# tmin = 1
# tmax = 10
# tmax2 = 11
# insert_statement = cass.prepare("""
#             INSERT INTO weather.stations (id, date, record)
#             VALUES (?, ?, { tmin: ?, tmax: ? })
#             """)
# insert_statement.consistency_level = ConsistencyLevel.ONE
# cass.execute(insert_statement, (station, date, tmin, tmax))
# cass.execute(insert_statement, (station, date, tmin, tmax2))

In [48]:
# # testing max_statement for server.py after an insert with tmax != None
# max_statement = cass.prepare("""
#             SELECT MAX(record.tmax) FROM weather.stations WHERE id = ?
#         """)
# record_id = 'testID'
# max_statement.consistency_level = ConsistencyLevel.ONE
# res = cass.execute(max_statement, (record_id,)).one()[0]
# print(res, type(res))

In [49]:
# !nodetool ring
list_output = check_output("nodetool ring", shell=True).decode("utf-8").split("\n")[4:-6]
# list_output

In [50]:
numbers = []
for s in list_output:
    numbers.append(int(s.split()[-1]))
numbers = numbers[1:]
numbers

[-8853427648287254574,
 -8580441399010981377,
 -8254963957330515747,
 -7875282195771140498,
 -7484399498776043088,
 -7220663310655111380,
 -6794376809373981990,
 -6393102388083830828,
 -6096647894947553369,
 -5548725740592669071,
 -5444174982799304106,
 -5000605994712615080,
 -4386798465870345889,
 -4108457510690085831,
 -3909308884020402820,
 -3064932236529241064,
 -3036956894350053019,
 -2265103565868110780,
 -2019105489607626003,
 -1624664006626657824,
 -1201433556995275209,
 -1006682974724457746,
 -553163390286625012,
 -125677675769695670,
 57658015048605649,
 464688014455802003,
 610238016941197970,
 1282359947068152797,
 1689610824388799628,
 1877662316028552047,
 2358115828293732336,
 2933130830069391600,
 3094031521004625977,
 3754726645541538076,
 3900495073147927099,
 4173404328452227635,
 4923681568720552455,
 5416924334132819607,
 5490499625661825737,
 5972805369049743543,
 6384288577211355106,
 6908718541841258963,
 7109522921358869034,
 7324356201412898843,
 7974293129725

In [51]:
#q4: what is the first vnode token in the ring following the token for USC00470273?
# find token for this station
# find the first bigger token compare with this token vnode 
token = cass.execute("select token(ID) from weather.stations where ID = 'USC00470273'").one()[0]
# first large num we ecnounter is the vnode this token belongs to
# need to think about the wrap around case
answer = None
for idx in range(len(numbers)): # find the vnode that this token belongs to and return the next vnode
    cur = numbers[idx]
    # this is the answer or it isnt
    # we have an answer if cur is greater than token
    if cur > token:
        answer = cur
        break
if not answer: answer = numbers[0]
answer

-8853427648287254574

In [52]:
# !unzip records.zip # only want to run once

In [53]:
filenames = ["part-00000-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet",
"part-00001-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet",
"part-00002-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet",
"part-00003-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet"]
# could maybe avoid hardcoding by importing os/sys or something to read these parquets
# (grab all files in this directory ending in ".snappy.parquet")
# https://spark.apache.org/docs/2.4.4/sql-data-sources-parquet.html
Parquet0 = spark.read.parquet("records.parquet/" + filenames[0])
Parquet1 = spark.read.parquet("records.parquet/" + filenames[1])
Parquet2 = spark.read.parquet("records.parquet/" + filenames[2])
Parquet3 = spark.read.parquet("records.parquet/" + filenames[3])

In [54]:
Parquet0.createOrReplaceTempView("parquet0")
Parquet1.createOrReplaceTempView("parquet1")
Parquet2.createOrReplaceTempView("parquet2")
Parquet3.createOrReplaceTempView("parquet3")

In [55]:
# DFs have same schema, element can be tmin or tmax, value corresponds to this element
# reformat date: https://spark.apache.org/docs/2.3.0/api/sql/index.html#date_format
# https://stackoverflow.com/questions/49615307/changing-the-date-format-of-the-column-values-in-a-spark-dataframe
# take min of TMIN and MAX of tmax for each station+date
# https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-case.html
# CASE WHEN kind of like if else, helps us process values if they are element=tmin/tmax
# https://stackoverflow.com/questions/25157451/spark-sql-case-when-then
# combine dfs and groupby station+date
# https://stackoverflow.com/questions/905379/what-is-the-difference-between-join-and-union
# UNION instead of join sort of stacks all the rows into one big df
big_df = spark.sql("""
    SELECT station,
    date,
    MIN(CASE WHEN element = 'TMIN' THEN value END) as tmin,
    MAX(CASE WHEN element = 'TMAX' THEN value END) as tmax
    FROM (
        SELECT * FROM parquet0
        UNION ALL
        SELECT * FROM parquet1
        UNION ALL
        SELECT * FROM parquet2
        UNION ALL
        SELECT * FROM parquet3
    )
    GROUP BY station,date
""")
big_df = big_df.withColumn("formatted_date", date_format(to_date(col("date"), "yyyyMMdd"), "yyyy-MM-dd"))
big_df.show()

+-----------+--------+------+-----+--------------+
|    station|    date|  tmin| tmax|formatted_date|
+-----------+--------+------+-----+--------------+
|USW00014837|20220222| -88.0|-38.0|    2022-02-22|
|USW00014837|20220624| 200.0|322.0|    2022-06-24|
|USW00014837|20220731| 161.0|278.0|    2022-07-31|
|USW00014837|20220802| 150.0|306.0|    2022-08-02|
|USW00014837|20220906| 117.0|256.0|    2022-09-06|
|USW00014837|20220912| 117.0|161.0|    2022-09-12|
|USW00014837|20221115| -10.0| 11.0|    2022-11-15|
|USW00014837|20220408|   0.0| 22.0|    2022-04-08|
|USW00014837|20221015|  -5.0|122.0|    2022-10-15|
|USW00014837|20221216| -50.0|-22.0|    2022-12-16|
|USW00014837|20220205|-188.0|-60.0|    2022-02-05|
|USW00014837|20220420|  50.0|100.0|    2022-04-20|
|USW00014837|20220508|  72.0|161.0|    2022-05-08|
|USW00014837|20221116| -21.0| 11.0|    2022-11-16|
|USW00014837|20220206| -77.0| 28.0|    2022-02-06|
|USW00014837|20220810| 144.0|283.0|    2022-08-10|
|USW00014837|20221203| -78.0| 5

In [56]:
# take big_df, loop over rows and insert into server.py
# making rpc calls to server.py

In [57]:
channel = grpc.insecure_channel("127.0.0.1:5440") 
stub = station_pb2_grpc.StationStub(channel)
# stub.RecordTemps, stub.StationMax
# iterate through big_df and feed in station=station, date=formatted_date, tmin=tmin, tmax=tmax
# might have to format date from string into something CQL friendly for inserts


In [58]:
# https://sparkbyexamples.com/spark/spark-foreach-usage-with-examples/
def send_row(row):
    station = row["station"]
    date = row["formatted_date"]
    tmin = int(row["tmin"])
    tmax = int(row["tmax"])
    # print(type(station), type(date), type(tmin), type(tmax))
    # print(station, date, tmin, tmax)
    # if (station == 'USW00014837'):
    #     print(type(station), type(date), type(tmin), type(tmax))
    #     print(station, date, tmin, tmax)
    res = stub.RecordTemps(station_pb2.RecordTempsRequest(
        station = station,
        date = date,
        tmin = (tmin),
        tmax = (tmax)
        ))
        
# big_df.foreach(send_row) # send each row to server.py
# issue with partitions, some partitions/cores have no idea what channel and stub are(?)
for row in big_df.collect():
    send_row(row)

In [59]:
#q5: what is the max temperature ever seen for station USW00014837?
r = stub.StationMax(station_pb2.StationMaxRequest(station = 'USW00014837'))
r.tmax

356

In [60]:
# cass.execute("SELECT * FROM weather.stations WHERE id = 'USW00014837'").one()

In [61]:
df = spark.read.format("org.apache.spark.sql.cassandra")\
.option("spark.cassandra.connection.host", "p6-db-1,p6-db-2,p6-db-3")\
.option("keyspace", "weather")\
.option("table", "stations")\
.load()

In [62]:
#q6: what tables/views are available in the Spark catalog?
spark.catalog.listTables()

[Table(name='parquet0', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='parquet1', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='parquet2', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='parquet3', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='stations', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [63]:
#q7: what is the average difference between tmax and tmin, for each of the four stations that have temperature records?
rows_with_record = df.filter(col("record.tmin").isNotNull() & col("record.tmax").isNotNull()).collect()
dict_differences = {} # id: total(tmax-tmin)
dict_occurences = {} # id : occurences

for row in rows_with_record:
    if row["id"] not in dict_differences:
        # add to both dicts
        dict_differences[row["id"]] = row["record"]["tmax"] - row["record"]["tmin"]
        dict_occurences[row["id"]] = 1
    else:
        # update both dicts
        dict_differences[row["id"]] += row["record"]["tmax"] - row["record"]["tmin"]
        dict_occurences[row["id"]] += 1
        

results = {} # id : avg
# loop to populate results
for key in dict_differences:
    results[key] = dict_differences[key] / dict_occurences[key]
results
        

                                                                                

{'USW00014898': 102.93698630136986,
 'USR0000WDDG': 102.06849315068493,
 'USW00014837': 105.62739726027397,
 'USW00014839': 89.6986301369863}

23/11/23 04:55:15 WARN ChannelPool: [s0|p6-db-2/172.21.0.3:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=add2d1c1-e96f-40b0-b5c7-f030dacb594f, APPLICATION_NAME=Spark-Cassandra-Connector-local-1700714921475}): failed to send request (java.nio.channels.NotYetConnectedException))


In [69]:
#q8: what does nodetool status output?
!nodetool status

Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address     Load        Tokens  Owns (effective)  Host ID                               Rack 
UN  172.21.0.4  182.34 KiB  16      100.0%            e8371820-8545-4295-a6ee-246e887e4985  rack1
UN  172.21.0.2  188.24 KiB  16      100.0%            b10f2eec-791a-4d47-9d37-84dec8be4cc0  rack1
DN  172.21.0.3  182.15 KiB  16      100.0%            8ff7e1fd-e673-40d7-a20d-491a81840f9a  rack1



In [70]:
#q9: if you make a StationMax RPC call, what does the error field contain in StationMaxReply reply?
r = stub.StationMax(station_pb2.StationMaxRequest(station = 'USW00014837'))
r.error

'Need 3 replicas, but only have 2'

In [71]:
#q10: if you make a RecordTempsRequest RPC call, what does error contain in the RecordTempsReply reply?
res = stub.RecordTemps(station_pb2.RecordTempsRequest(
        station = "abcd",
        date = "2023-11-22",
        tmin = -10,
        tmax = 10
        ))
res.error

''

23/11/23 04:57:38 WARN ChannelPool: [s0|p6-db-2/172.21.0.3:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=add2d1c1-e96f-40b0-b5c7-f030dacb594f, APPLICATION_NAME=Spark-Cassandra-Connector-local-1700714921475}): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException))
23/11/23 04:58:40 WARN ChannelPool: [s0|p6-db-2/172.21.0.3:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=add2d1c1-e96f-40b0-b5c7-f030dacb594f, APPLICATION_NAME=Spark-Cassandra-Connector-local-1700714921475}): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClose