In [2]:
!nodetool status

Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address     Load        Tokens  Owns (effective)  Host ID                               Rack 
UN  172.22.0.2  151.38 KiB  16      80.4%             98d6bc19-ebce-434a-bbf4-b04836f5f591  rack1
UN  172.22.0.3  151.71 KiB  16      60.1%             3b2f5122-1b49-46b7-a52a-479b2f479a25  rack1
UN  172.22.0.4  151.28 KiB  16      59.5%             6b9500ae-0149-45ca-af77-9930922a6587  rack1



In [3]:
from cassandra.cluster import Cluster
cluster = Cluster(['p6-db-1', 'p6-db-2', 'p6-db-3'])
cass = cluster.connect()

cass.execute("DROP KEYSPACE IF EXISTS weather")

cass.execute("""
CREATE KEYSPACE weather WITH replication = {
    'class': 'SimpleStrategy',
    'replication_factor': 3
}
""")

cass.execute("""
CREATE TYPE weather.station_record (
    tmin int,
    tmax int
)
""")

cass.execute("""
CREATE TABLE weather.stations (
    id text,
    name text static,
    date date,
    record weather.station_record,
    PRIMARY KEY (id, date)
) WITH CLUSTERING ORDER BY (date ASC)
""")

<cassandra.cluster.ResultSet at 0x75c0f82f4e50>

In [4]:
#q1
create_statement = cass.execute("DESCRIBE TABLE weather.stations").one().create_statement
create_statement

"CREATE TABLE weather.stations (\n    id text,\n    date date,\n    name text static,\n    record station_record,\n    PRIMARY KEY (id, date)\n) WITH CLUSTERING ORDER BY (date ASC)\n    AND additional_write_policy = '99p'\n    AND bloom_filter_fp_chance = 0.01\n    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}\n    AND cdc = false\n    AND comment = ''\n    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}\n    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}\n    AND memtable = 'default'\n    AND crc_check_chance = 1.0\n    AND default_time_to_live = 0\n    AND extensions = {}\n    AND gc_grace_seconds = 864000\n    AND max_index_interval = 2048\n    AND memtable_flush_period_in_ms = 0\n    AND min_index_interval = 128\n    AND read_repair = 'BLOCKING'\n    AND speculative_retry = '99p';"

In [5]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("p6")
         .config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.0')
         .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
         .getOrCreate())

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f4776a6d-abbb-4f4d-923a-30c58a5c1bc7;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.12;3.4.0 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.4.0 in central
	found com.datastax.oss#java-driver-core-shaded;4.13.0 in central
	found com.datastax.oss#native-protocol;1.5.0 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central
	found com.github.stephenc.jcip#jcip-annotations;1.0-1 in central
	found com.gith

In [6]:
df = spark.read.text("ghcnd-stations.txt")

stations_df = df.select(
    df.value.substr(1, 11).alias("id"),
    df.value.substr(39, 2).alias("state"),
    df.value.substr(42, 30).alias("name")
)

wi_stations = stations_df.filter(stations_df.state == "WI")

local_stations = wi_stations.collect()

for station in local_stations:
    cass.execute(
        """
        INSERT INTO weather.stations (id, name, date, record)
        VALUES (%s, %s, todate(now()), {tmin: null, tmax: null})
        """,
        (station.id, station.name.strip())
    )

                                                                                

In [7]:
#q2
result = cass.execute("SELECT name FROM weather.stations WHERE id='US1WIMR0003'").one()[0]
result

'AMBERG 1.3 SW'

In [8]:
#q3
currToken = cass.execute("SELECT token(id) FROM weather.stations WHERE id='USC00470273'").one()[0]
currToken

-9014250178872933741

In [10]:
import subprocess
output = subprocess.check_output('nodetool ring', shell=True)
decoded_output = output.decode('utf-8')
split_output = decoded_output.split('\n')

tokens = []
for x in split_output[5:-6]:
    section = x.split()
    tokens.append(int(section[7]))
    
for i in range(len(tokens)):
    if currToken < tokens[i]:
        vnode_token = tokens[i]
        break

In [11]:
#q4
vnode_token

-8784482959056695711

In [12]:
!unzip -n records.zip

Archive:  records.zip


In [14]:
from pyspark.sql.functions import first, col
from datetime import datetime
import grpc
import station_pb2
import station_pb2_grpc

df = spark.read.parquet("records.parquet")
filtered_df = df.filter((col("element") == "TMAX") | (col("element") == "TMIN"))
pivot_df = filtered_df.groupBy("station", "date").pivot("element").agg(first("value"))
weather_data = pivot_df.collect()

channel = grpc.insecure_channel('localhost:5440')
stub = station_pb2_grpc.StationStub(channel)

for row in weather_data:
    station_id = row[0]
    date_string = row[1]
    date = datetime.strptime(date_string, '%Y%m%d').strftime('%Y-%m-%d')
    t_max = row['TMAX']
    t_min = row['TMIN']
    try:
        err = stub.RecordTemps(station_pb2.RecordTempsRequest(station=station_id,date=date,tmin=int(t_min),tmax=int(t_max)))
    except grpc.RpcError as e:
        print("Failed to send data:", e.details())

                                                                                

In [15]:
#q5
new_id = 'USW00014837'
stub.StationMax(station_pb2.StationMaxRequest(station=new_id)).tmax

356

In [16]:
df = (spark.read.format("org.apache.spark.sql.cassandra")
.option("spark.cassandra.connection.host", "p6-db-1,p6-db-2,p6-db-3")
.option("keyspace", "weather")
.option("table", "stations")
.load())

df.createOrReplaceTempView("stations")

In [17]:
#q6
tables = spark.catalog.listTables()
tables

[Table(name='stations', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [18]:
#q7
res = spark.sql("""
    SELECT id, AVG(record.tmax - record.tmin)
    FROM stations
    WHERE record IS NOT NULL
    GROUP BY id
""")

res_dict = res.rdd.collectAsMap()
res_dict

                                                                                

{'USR0000WDDG': 102.06849315068493,
 'USW00014837': 105.62739726027397,
 'USW00014839': 89.6986301369863,
 'USW00014898': 102.93698630136986}

In [19]:
#q8
!nodetool status

Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address     Load        Tokens  Owns (effective)  Host ID                               Rack 
UN  172.22.0.2  133.07 KiB  16      100.0%            98d6bc19-ebce-434a-bbf4-b04836f5f591  rack1
UN  172.22.0.3  133.39 KiB  16      100.0%            3b2f5122-1b49-46b7-a52a-479b2f479a25  rack1
UN  172.22.0.4  132.96 KiB  16      100.0%            6b9500ae-0149-45ca-af77-9930922a6587  rack1



In [20]:
#q9
new_id = 'USW00014837'
stub.StationMax(station_pb2.StationMaxRequest(station=new_id)).error

''

In [21]:
#q10
station_id = "US312344"
date = "2022-04-25"
t_min = -5
t_max = 30
stub.RecordTemps(station_pb2.RecordTempsRequest(station=station_id,date=date,tmin=int(t_min),tmax=int(t_max)))

