In [97]:
!nodetool status
# run after docker compose up -d
# might take ~30 seconds

Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address     Load      Tokens  Owns (effective)  Host ID                               Rack 
UN  172.28.0.2  1.13 MiB  16      100.0%            ce4aaa17-9c65-487e-aaa0-9cc5e5d5099c  rack1
UN  172.28.0.4  1.16 MiB  16      100.0%            719f1e7c-fd4c-4465-8440-e9819e3ad4b1  rack1
UN  172.28.0.3  1.13 MiB  16      100.0%            77001d86-317c-4c56-b7bd-ef28189a806c  rack1



In [98]:
from cassandra.cluster import Cluster
cluster = Cluster(['p6-db-1', 'p6-db-2', 'p6-db-3'])
cass = cluster.connect()



In [142]:
cass.execute("DROP KEYSPACE IF EXISTS weather")

<cassandra.cluster.ResultSet at 0x7fd29665fb20>

In [143]:
cass.execute("""
CREATE KEYSPACE weather
WITH REPLICATION = { 
   'class' : 'SimpleStrategy', 
   'replication_factor' : 3 
};
""")

<cassandra.cluster.ResultSet at 0x7fd2bb498910>

In [144]:
cass.execute("USE weather")

<cassandra.cluster.ResultSet at 0x7fd293713a00>

In [145]:
cass.execute("CREATE TYPE station_record (tmin int, tmax int)")

<cassandra.cluster.ResultSet at 0x7fd2937139a0>

In [146]:
cass.execute("""
create table stations(
    id TEXT,
    name TEXT STATIC,
    date DATE,
    record weather.station_record,
    PRIMARY KEY ((id), date) 
) WITH CLUSTERING ORDER BY (date ASC)
""")

<cassandra.cluster.ResultSet at 0x7fd2983059f0>

In [147]:
#q1
#What is the Schema of stations?
cass.execute("describe table weather.stations").one().create_statement

"CREATE TABLE weather.stations (\n    id text,\n    date date,\n    name text static,\n    record station_record,\n    PRIMARY KEY (id, date)\n) WITH CLUSTERING ORDER BY (date ASC)\n    AND additional_write_policy = '99p'\n    AND bloom_filter_fp_chance = 0.01\n    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}\n    AND cdc = false\n    AND comment = ''\n    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}\n    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}\n    AND memtable = 'default'\n    AND crc_check_chance = 1.0\n    AND default_time_to_live = 0\n    AND extensions = {}\n    AND gc_grace_seconds = 864000\n    AND max_index_interval = 2048\n    AND memtable_flush_period_in_ms = 0\n    AND min_index_interval = 128\n    AND read_repair = 'BLOCKING'\n    AND speculative_retry = '99p';"

In [148]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("p6")
         .config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.0')
         .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
         .getOrCreate())
# first build spark session to read the input file, then dump the file into cassandra table

In [106]:
df = spark.read.text("ghcnd-stations.txt")

In [149]:
df.take(10)

[Row(value='ACW00011604  17.1167  -61.7833   10.1    ST JOHNS COOLIDGE FLD                       '),
 Row(value='ACW00011647  17.1333  -61.7833   19.2    ST JOHNS                                    '),
 Row(value='AE000041196  25.3330   55.5170   34.0    SHARJAH INTER. AIRP            GSN     41196'),
 Row(value='AEM00041194  25.2550   55.3640   10.4    DUBAI INTL                             41194'),
 Row(value='AEM00041217  24.4330   54.6510   26.8    ABU DHABI INTL                         41217'),
 Row(value='AEM00041218  24.2620   55.6090  264.9    AL AIN INTL                            41218'),
 Row(value='AF000040930  35.3170   69.0170 3366.0    NORTH-SALANG                   GSN     40930'),
 Row(value='AFM00040938  34.2100   62.2280  977.2    HERAT                                  40938'),
 Row(value='AFM00040948  34.5660   69.2120 1791.3    KABUL INTL                             40948'),
 Row(value='AFM00040990  31.5000   65.8500 1010.0    KANDAHAR AIRPORT                      

In [108]:
from pyspark.sql.functions import col, expr, rtrim

In [150]:
df2 = (df.withColumn("ID", expr("substring(value, 1, 11)"))
       .withColumn("STATE", expr("substring(value, 39, 2)"))
       .withColumn("NAME", rtrim(expr("substring(value, 42, 30)")))
       .drop("value"))
# refer to the documentation of "ghcnd-stations.txt
# trailing spaces after name need to be trimmed

In [122]:
df2.take(10)

[Row(ID='ACW00011604', STATE='  ', NAME='ST JOHNS COOLIDGE FLD'),
 Row(ID='ACW00011647', STATE='  ', NAME='ST JOHNS'),
 Row(ID='AE000041196', STATE='  ', NAME='SHARJAH INTER. AIRP'),
 Row(ID='AEM00041194', STATE='  ', NAME='DUBAI INTL'),
 Row(ID='AEM00041217', STATE='  ', NAME='ABU DHABI INTL'),
 Row(ID='AEM00041218', STATE='  ', NAME='AL AIN INTL'),
 Row(ID='AF000040930', STATE='  ', NAME='NORTH-SALANG'),
 Row(ID='AFM00040938', STATE='  ', NAME='HERAT'),
 Row(ID='AFM00040948', STATE='  ', NAME='KABUL INTL'),
 Row(ID='AFM00040990', STATE='  ', NAME='KANDAHAR AIRPORT')]

In [151]:
filtered_df = df2.where(df2.STATE == "WI")

In [152]:
rows = filtered_df.collect()
len(rows) # 1313 rows with STATE == "WI"

1313

In [153]:
for row in rows:
    cass.execute("""
        INSERT INTO stations (ID, NAME)
        VALUES (%s, %s)
        """,(row.ID, row.NAME))
# weather_list is a list derived from spark
# this step is essentially moving data from spark to cassandra

In [154]:
cass.execute("SELECT COUNT(*) FROM weather.stations").one()[0]



1313

In [132]:
#q2
#What is the name corresponding to station ID USW00014837?
cass.execute("""
    SELECT NAME 
    FROM weather.stations 
    WHERE ID = 'USW00014837'
""").one()[0]

'MADISON DANE CO RGNL AP'

In [51]:
#q3
#what is the token for the USC00470273 station?
token_0273 = cass.execute("""
    SELECT TOKEN(ID)
    FROM weather.stations 
    WHERE ID = 'USC00470273'
""").one()[0]
token_0273

-9014250178872933741

In [52]:
import subprocess
output = subprocess.check_output(['nodetool', 'ring'])
output



In [134]:
output_str = output.decode('utf-8')
lines = output_str.split('\n')
tokens = []
for line in lines:
    parts = line.split()
    if len(parts) > 1 and parts[-1].lstrip('-').isdigit(): # without lstrip, token with leading "-" will be removed
        tokens.append(int(parts[-1]))
tokens

[-9067677327791489754,
 -9016749086415396146,
 -8472897347158022530,
 -8049825923049062738,
 -7426744533901497508,
 -7376855446923084241,
 -7232153990436711944,
 -6285558336230871981,
 -6156398109211132404,
 -5807721238204454210,
 -5441181688739710224,
 -5420482416500238763,
 -4829575785688340135,
 -4341109609052637106,
 -4000913458837126984,
 -3097589603372045132,
 -3050941484864753006,
 -2929412842497094172,
 -2130225360293509634,
 -1954899584629814717,
 -1911561437754667156,
 -1093889505142316362,
 -863602473937602457,
 -540220807779610996,
 -19225826446440701,
 -18133623916736823,
 717782068794156817,
 1078802487917432300,
 1421042403456142537,
 1797154876241758475,
 2056947940433546376,
 2492543019796175350,
 3040674881922350447,
 3510394424538602365,
 3835582241257133504,
 4008039125000885946,
 4328066357150953160,
 4931624141492071793,
 5403822238376532699,
 5598043677514784584,
 6022921252184284052,
 6139737931087426340,
 6867297899675445810,
 7217066973211827881,
 721911073853

In [135]:
#q4
#what is the first vnode token in the ring following the token for USC00470273?
for i, node in enumerate(tokens):
    if token_0273 > tokens[i] and i + 1 == len(tokens): # wrapping around
        ans = tokens[0]
        break
    elif token_0273 < tokens[i+1]:
        ans = tokens[i+1]
        break
ans

-8472897347158022530

In [30]:
!unzip records.zip

Archive:  records.zip
replace records.parquet/part-00000-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet? [y]es, [n]o, [A]ll, [N]one, [r]ename: ^C


In [55]:
df_record = spark.read.parquet("records.parquet")

In [56]:
from pyspark.sql import functions as F
df2_record = (df_record.groupBy("station", "date")
                      .pivot("element", ["TMAX","TMIN"])
                      .agg(F.first("value"))
                      .orderBy("station"))
# Use Spark to load this and re-arrange the data so that there is 
# (a) one row per station/date combination, and (b) tmin and tmax columns
# Hint: The pivot function in PySpark might be useful to rearrange the data

In [57]:
df2_record.show(10)

+-----------+--------+------+------+
|    station|    date|  TMAX|  TMIN|
+-----------+--------+------+------+
|USR0000WDDG|20220806| 317.0| 217.0|
|USR0000WDDG|20220924| 161.0|  94.0|
|USR0000WDDG|20220628| 283.0| 161.0|
|USR0000WDDG|20220130| -33.0|-117.0|
|USR0000WDDG|20220919| 278.0| 139.0|
|USR0000WDDG|20220414|  50.0| -17.0|
|USR0000WDDG|20220629| 306.0| 150.0|
|USR0000WDDG|20221114|  17.0| -61.0|
|USR0000WDDG|20220712| 289.0| 156.0|
|USR0000WDDG|20220202|-106.0|-150.0|
+-----------+--------+------+------+
only showing top 10 rows



In [None]:
# cass.execute("""
# create table stations(
#     id TEXT,
#     name TEXT STATIC,
#     date DATE,
#     record weather.station_record,
#     PRIMARY KEY ((id), date) 
# ) WITH CLUSTERING ORDER BY (date ASC)
# """)

In [156]:
import grpc
import station_pb2 
import station_pb2_grpc 


rows = df2_record.collect()
channel = grpc.insecure_channel(f"localhost:5440") 
stub = station_pb2_grpc.StationStub(channel)

for row in rows:
    year = row.date[:4] # CQL requires that you insert date data in yyyy-mm-dd format
    month = row.date[4:6]
    day = row.date[6:]    
    request = station_pb2.RecordTempsRequest(
        station=row.station,
        date=str(year + "-" + month + "-" + day),
        tmin=int(row.TMIN),
        tmax=int(row.TMAX)
    )
    try:
        response = stub.RecordTemps(request)
        # print(response)
    except grpc.RpcError as e:
        print("gRPC call failed: {}".format(e))

In [157]:
result = cass.execute("""
SELECT *
FROM stations
WHERE id = 'USW00014837'
LIMIT 5
""")
for row in result:
    print(row)

Row(id='USW00014837', date=Date(18993), name='MADISON DANE CO RGNL AP', record=station_record(tmin=-99, tmax=-32))
Row(id='USW00014837', date=Date(18994), name='MADISON DANE CO RGNL AP', record=station_record(tmin=-166, tmax=-82))
Row(id='USW00014837', date=Date(18995), name='MADISON DANE CO RGNL AP', record=station_record(tmin=-177, tmax=-66))
Row(id='USW00014837', date=Date(18996), name='MADISON DANE CO RGNL AP', record=station_record(tmin=-88, tmax=-5))
Row(id='USW00014837', date=Date(18997), name='MADISON DANE CO RGNL AP', record=station_record(tmin=-116, tmax=-5))


In [158]:
#q5
#what is the max temperature ever seen for station USW00014837?
request = station_pb2.StationMaxRequest(station='USW00014837')
response = stub.StationMax(request)
response

tmax: 356

In [159]:
# already enabled CassandraSparkExtensions when creating your Spark session, so can create a Spark DataFrame corresponding to a Cassandra table like this
df = (spark.read.format("org.apache.spark.sql.cassandra")
      .option("spark.cassandra.connection.host", "p6-db-1,p6-db-2,p6-db-3")
      .option("keyspace", "weather")
      .option("table", "stations")
      .load())

In [160]:
# Create a temporary view in Spark named stations that corresponds to the stations table in Cassandra.
df.createOrReplaceTempView("stations")

In [161]:
#q6
#what tables/views are available in the Spark catalog?
spark.sql("DROP TABLE IF EXISTS id_state_name_table")
spark.catalog.listTables()

[Table(name='stations', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [162]:
#q7
#what is the average difference between tmax and tmin, for each of the four stations that have temperature records?
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf

def get_tmax(record):
    return record.tmax if record is not None else None

def get_tmin(record):
    return record.tmin if record is not None else None

get_tmax_udf = udf(get_tmax, IntegerType())
get_tmin_udf = udf(get_tmin, IntegerType())

spark.udf.register("get_tmax", get_tmax_udf)
spark.udf.register("get_tmin", get_tmin_udf)

diff_df = spark.sql("""
SELECT id, AVG(get_tmax(record) - get_tmin(record)) as diff
FROM stations
WHERE id = "USW00014839"
OR id = "USR0000WDDG"
OR id = "USW00014837"
OR id = "USW00014898"
GROUP BY id
""")
rows = diff_df.collect()
result_dict = {row['id']: row['diff'] for row in rows}
result_dict

23/11/18 09:08:43 WARN SimpleFunctionRegistry: The function get_tmax replaced a previously registered function.
23/11/18 09:08:43 WARN SimpleFunctionRegistry: The function get_tmin replaced a previously registered function.
                                                                                

{'USW00014837': 105.62739726027397,
 'USR0000WDDG': 102.06849315068493,
 'USW00014839': 89.6986301369863,
 'USW00014898': 102.93698630136986}

In [68]:
#q8
#what does nodetool status output?
! nodetool status

Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address     Load        Tokens  Owns (effective)  Host ID                               Rack 
UN  172.28.0.2  87.75 KiB   16      100.0%            ce4aaa17-9c65-487e-aaa0-9cc5e5d5099c  rack1
UN  172.28.0.4  113.17 KiB  16      100.0%            719f1e7c-fd4c-4465-8440-e9819e3ad4b1  rack1
DN  172.28.0.3  87.75 KiB   16      100.0%            77001d86-317c-4c56-b7bd-ef28189a806c  rack1





In [69]:
#q9
#if you make a StationMax RPC call, what does the error field contain in StationMaxReply reply?
request = station_pb2.StationMaxRequest(station='USW00014837')
response = stub.StationMax(request)
response

error: "Error from server: code=1000 [Unavailable exception] message=\"Cannot achieve consistency level THREE\" info={\'consistency\': \'THREE\', \'required_replicas\': 3, \'alive_replicas\': 2}"

23/11/17 11:28:14 WARN ChannelPool: [s0|p6-db-2/172.28.0.3:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=08e5e5bc-8cee-4c35-94d3-32e0bb42b59a, APPLICATION_NAME=Spark-Cassandra-Connector-local-1700219919669}): failed to send request (java.nio.channels.NotYetConnectedException))


In [70]:
#q10
#if you make a RecordTempsRequest RPC call, what does error contain in the RecordTempsReply reply?
request = station_pb2.RecordTempsRequest(
    station="UWMADISON",
    date=str("2023-4-20"),
    tmin=int(0),
    tmax=int(100)
)

response = stub.RecordTemps(request)
response



23/11/17 11:28:33 WARN ChannelPool: [s0|p6-db-2/172.28.0.3:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=08e5e5bc-8cee-4c35-94d3-32e0bb42b59a, APPLICATION_NAME=Spark-Cassandra-Connector-local-1700219919669}): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException))
23/11/17 11:29:10 WARN ChannelPool: [s0|p6-db-2/172.28.0.3:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=08e5e5bc-8cee-4c35-94d3-32e0bb42b59a, APPLICATION_NAME=Spark-Cassandra-Connector-local-1700219919669}): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClose