In [1]:
!nodetool status

Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load        Tokens  Owns (effective)  Host ID                               Rack 
UN  192.168.144.3  121.45 KiB  16      100.0%            67dca57f-7b20-4fca-9d27-4236d876c191  rack1
UN  192.168.144.4  121.42 KiB  16      100.0%            9aa0ef31-b9ee-42ed-9517-808e0336a12f  rack1
UN  192.168.144.2  126.51 KiB  16      100.0%            5c3b3a31-6a8a-46fe-a32c-e6383b30ada6  rack1



In [2]:
from pyspark.sql.functions import expr
from subprocess import check_output
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import grpc
import station_pb2
import station_pb2_grpc
from datetime import datetime

In [3]:
from cassandra.cluster import Cluster
cluster = Cluster(['p6-db-1', 'p6-db-2', 'p6-db-3'])
cass = cluster.connect()

In [4]:
cass.execute("DROP KEYSPACE IF EXISTS weather;")

cass.execute("""
    CREATE KEYSPACE weather
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3};
""")
cass.set_keyspace('weather')

cass.execute("""
    CREATE TYPE station_record (
        tmin int,
        tmax int
    );
""")



cass.execute("""
    CREATE TABLE stations (
        id text,
        name text STATIC,
        date date,
        record station_record,
        PRIMARY KEY (id, date)
    ) WITH CLUSTERING ORDER BY (date ASC);
""")


print("Database setup complete.")

Database setup complete.


In [5]:
#q1
schema = cass.execute("DESCRIBE TABLE weather.stations")
schema.one().create_statement

"CREATE TABLE weather.stations (\n    id text,\n    date date,\n    name text static,\n    record station_record,\n    PRIMARY KEY (id, date)\n) WITH CLUSTERING ORDER BY (date ASC)\n    AND additional_write_policy = '99p'\n    AND bloom_filter_fp_chance = 0.01\n    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}\n    AND cdc = false\n    AND comment = ''\n    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}\n    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}\n    AND memtable = 'default'\n    AND crc_check_chance = 1.0\n    AND default_time_to_live = 0\n    AND extensions = {}\n    AND gc_grace_seconds = 864000\n    AND max_index_interval = 2048\n    AND memtable_flush_period_in_ms = 0\n    AND min_index_interval = 128\n    AND read_repair = 'BLOCKING'\n    AND speculative_retry = '99p';"

In [6]:
spark = (SparkSession.builder
         .appName("p6")
         .config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.0')
         .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
         .getOrCreate())

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-fa60381c-154f-4146-b0ed-70b1f8eb3e7f;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.12;3.4.0 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.4.0 in central
	found com.datastax.oss#java-driver-core-shaded;4.13.0 in central
	found com.datastax.oss#native-protocol;1.5.0 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central
	found com.github.stephenc.jcip#jcip-annotations;1.0-1 in central
	found com.gith

In [7]:
path = "ghcnd-stations.txt"
stations_df = spark.read.text(path)
processed_stations = stations_df.select(
    expr("substring(value, 1, 11)").alias("id"),
    expr("substring(value, 39, 2)").alias("state"),
    expr("trim(substring(value, 42, 30))").alias("name")
)

# filter WI
stations_wi = processed_stations.filter(processed_stations.state == "WI")
cass.execute("DROP TABLE IF EXISTS weather.stations;")
cass.execute("""
create table weather.stations(
    id text, 
    name text static,
    date date,
    record station_record,
    PRIMARY KEY(id, date)   
)
""")


for row in stations_wi.collect():
    cass.execute(
        "INSERT INTO weather.stations (id, name) VALUES (%s, %s)",
        (row.id, row.name)
        
    )

                                                                                

In [8]:
#q2
result = cass.execute(""" 
    SELECT name 
    FROM weather.stations 
    WHERE id = 'US1WIMR0003'
    """
)
q2 = result.one()[0]
q2

'AMBERG 1.3 SW'

In [9]:
#q3
result_q3 = cass.execute("""
    SELECT token(id) 
    FROM weather.stations 
    WHERE id = 'USC00470273'
    """)
q3=result_q3.one()[0]
q3

-9014250178872933741

In [10]:
#q4
output = check_output(["nodetool", "ring"])

tokens = []
my_output = output.decode().split(" ")
tokens = [l for l in my_output if l[1:].isnumeric()]
        
max = int(tokens[0])
min = int(tokens[1])
tokens = tokens[1:]
for i in range(len(tokens)):
    token = int(tokens[i])
    
    if q3 < min or q3 > max:
        print(min)
        break
        
    if q3 < int(token):
        print(tokens[i])
        break

-8849592649763465485


In [11]:
!unzip -n records.zip

Archive:  records.zip


In [12]:
records = spark.read.parquet("records.parquet")
records = records.groupBy("station","date").pivot("element",["TMIN","TMAX"])
records.sum().toPandas()

                                                                                

Unnamed: 0,station,date,TMIN,TMAX
0,USW00014898,20220107,-166.0,-71.0
1,USW00014839,20220924,117.0,194.0
2,USW00014839,20220523,83.0,150.0
3,USW00014839,20221019,11.0,83.0
4,USW00014839,20220529,139.0,261.0
...,...,...,...,...
1455,USW00014898,20220724,167.0,278.0
1456,USW00014837,20221004,50.0,222.0
1457,USW00014837,20221107,17.0,94.0
1458,USW00014898,20221006,56.0,200.0


In [13]:
channel = grpc.insecure_channel("localhost:5440")
channel_stub = station_pb2_grpc.StationStub(channel)
collection = records.sum().collect()

for i in collection:
    station = i["station"]
    date = i["date"]
    tmin = i["TMIN"]
    tmax = i["TMAX"]
  
    date = datetime.strptime(date, "%Y%m%d").strftime("%Y-%m-%d")
    response = channel_stub.RecordTemps(station_pb2.RecordTempsRequest(station=station, date=date, tmin=int(tmin), tmax=int(tmax)))

In [14]:
#q5
q5 = (channel_stub.StationMax(station_pb2.StationMaxRequest(station = "USW00014837"))).tmax
q5

356

In [15]:
#q6
spark.read.format("org.apache.spark.sql.cassandra") \
    .option("spark.cassandra.connection.host", "p6-db-1,p6-db-2,p6-db-3") \
    .option("keyspace", "weather") \
    .option("table", "stations") \
    .load() \
    .createOrReplaceTempView("stations")


spark.catalog.listTables()

[Table(name='stations', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [16]:
#q7
spark = SparkSession.builder \
    .appName("TemperatureDifference") \
    .getOrCreate()
stations_sql = spark.sql("SELECT * FROM stations")

temp_diff = stations_sql.withColumn("temperature_diff", col("record.tmax") - col("record.tmin"))

filtered = temp_diff.filter(col("record").isNotNull())

avg_diff = filtered.groupBy("id").avg("temperature_diff")
avg_diff_dict_q7 = {row['id']: row['avg(temperature_diff)'] for row in avg_diff.collect()}

avg_diff_dict_q7

24/04/18 22:59:34 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

{'USW00014839': 89.6986301369863,
 'USW00014837': 105.62739726027397,
 'USW00014898': 102.93698630136986,
 'USR0000WDDG': 102.06849315068493}

In [17]:
#q8
!nodetool status

Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load        Tokens  Owns (effective)  Host ID                               Rack 
UN  192.168.144.3  227.73 KiB  16      100.0%            67dca57f-7b20-4fca-9d27-4236d876c191  rack1
UN  192.168.144.4  226.99 KiB  16      100.0%            9aa0ef31-b9ee-42ed-9517-808e0336a12f  rack1
UN  192.168.144.2  233.54 KiB  16      100.0%            5c3b3a31-6a8a-46fe-a32c-e6383b30ada6  rack1



In [18]:
#q9
(channel_stub.StationMax(station_pb2.StationMaxRequest(station = "USW00014837"))).error

''

In [19]:
#q10
(channel_stub.RecordTemps(station_pb2.RecordTempsRequest(station="USW00014837", date="2023-03-12", tmin=10, tmax=90))).error

''