In [1]:
!nodetool status

Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address     Load        Tokens  Owns (effective)  Host ID                               Rack 
UN  172.18.0.4  133.49 KiB  16      69.2%             9f4273a4-8316-4c6b-9033-a10c872d8058  rack1
UN  172.18.0.3  25.55 KiB   16      72.3%             3be4dc89-03dc-490e-ae7d-f9f97a2812d5  rack1
UN  172.18.0.2  25.55 KiB   16      58.4%             c76e0455-33cd-407f-b3e2-5e448f38b827  rack1



In [3]:
from cassandra.cluster import Cluster
cluster = Cluster(['p6-db-1', 'p6-db-2', 'p6-db-3'])
cass = cluster.connect()

In [4]:
cass.execute("DROP KEYSPACE IF EXISTS weather")

cass.execute("""CREATE KEYSPACE weather WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '3'}""")

cass.execute("use weather")
cass.execute("""CREATE TYPE station_record ( tmin int, tmax int)""")

cass.execute("""
CREATE TABLE stations(id text, name text static, date date, record station_record, PRIMARY KEY (id, date)) WITH CLUSTERING ORDER BY (date ASC)""")

<cassandra.cluster.ResultSet at 0x7f973015bfa0>

In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import expr
from datetime import datetime
current_date = datetime.now().date()
spark = (SparkSession.builder
         .appName("p6")
         .config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.0')
         .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
         .getOrCreate())

stations_df = spark.read.text("/nb/ghcnd-stations.txt")
stations_processed = stations_df.select(
    expr("substring(value, 1, 11)").alias("ID"),
    expr("substring(value, 39, 2)").alias("STATE"),
    expr("substring(value, 42, 30)").alias("NAME")
)
wi_stations = stations_processed.filter(stations_processed.STATE == "WI")
wi_stations_list = wi_stations.collect()


for station in wi_stations_list:
    cass.execute("""
        INSERT INTO stations (id,name)
        VALUES (%s, %s)
    """,
        (station.ID, station.NAME)
                )
val = cass.execute("""SELECT COUNT(*) FROM stations""")
describe_table_query = "DESCRIBE TABLE stations"
table_description = cass.execute(describe_table_query)

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f4337d88-a3f1-4a86-9770-4b2d7f27aa0b;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.12;3.4.0 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.4.0 in central
	found com.datastax.oss#java-driver-core-shaded;4.13.0 in central
	found com.datastax.oss#native-protocol;1.5.0 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central
	found com.github.stephenc.jcip#jcip-annotations;1.0-1 in central
	found com.gith

In [6]:
#q1
print((cass.execute("describe table stations")).one().create_statement)

CREATE TABLE weather.stations (
    id text,
    date date,
    name text static,
    record station_record,
    PRIMARY KEY (id, date)
) WITH CLUSTERING ORDER BY (date ASC)
    AND additional_write_policy = '99p'
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND cdc = false
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND memtable = 'default'
    AND crc_check_chance = 1.0
    AND default_time_to_live = 0
    AND extensions = {}
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair = 'BLOCKING'
    AND speculative_retry = '99p';


In [7]:
#q2
query = "SELECT name FROM stations WHERE id = 'USW00014837'"
result = cass.execute(query)
name = result.one()[0]
name

'MADISON DANE CO RGNL AP       '

In [8]:
#q3
result = cass.execute("SELECT token(id) FROM stations WHERE id = 'USC00470273'")
station_token = result.one()[0]
station_token

-9014250178872933741

In [9]:
#q4
from subprocess import check_output

node_out = check_output(["nodetool", "ring"]).decode("utf-8")
tokens = []

for line in node_out.split("\n"):
    if 'Normal' in line:
        tok = int(line.split()[-1])  
        tokens.append(tok)
        
tokens.sort()

next_token = None

for tok in tokens:
    if tok > station_token:
        next_token = tok
        break

if next_token is None and tokens:
    next_token = tokens[0]

next_token

-9010925140513953854

In [10]:
#q5
import grpc
import station_pb2
import station_pb2_grpc
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date

spark = SparkSession.builder.appName("TemperatureRecords").getOrCreate()

df = spark.read.parquet("./records.parquet")
df_tmax = df.filter(df.element == 'TMAX') \
    .withColumn("date", to_date(col("date").cast("string"), 'yyyyMMdd')) \
    .withColumnRenamed("value", "tmax")

df_tmin = df.filter(df.element == 'TMIN') \
    .withColumn("date", to_date(col("date").cast("string"), 'yyyyMMdd')) \
    .withColumnRenamed("value", "tmin")

df_joined = df_tmax.join(df_tmin, ['station', 'date'])

channel = grpc.insecure_channel('localhost:5440')
stub = station_pb2_grpc.StationStub(channel)

for row in df_joined.collect():
    station_id = row["station"]
    date = row["date"].strftime("%Y-%m-%d") 
    tmax = int(row["tmax"])
    tmin = int(row["tmin"])
    request = station_pb2.RecordTempsRequest(
        station=station_id,
        date=date,
        tmin=tmin,
        tmax=tmax
    )
    response = stub.RecordTemps(request)

response = stub.StationMax(station_pb2.StationMaxRequest(station="USW00014837"))

if not response.error:
    max_temp = response.tmax
else:
    max_temp = "Error: " + response.error

max_temp


23/11/21 02:54:36 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

356

In [11]:
#q6
stationsDF = spark.read.format("org.apache.spark.sql.cassandra").option("spark.cassandra.connection.host", "p6-db-1,p6-db-2,p6-db-3").option("keyspace", "weather").option("table", "stations").load()
stationsDF.createOrReplaceTempView("stations")
tables = spark.catalog.listTables()
tables

[Table(name='stations', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [12]:
#q7
from pyspark.sql import SparkSession
from pyspark.sql.functions import avg, col

stations_of_interest = ['USW00014839', 'USR0000WDDG', 'USW00014837', 'USW00014898']
df_filtered = df_joined.filter(col('station').isin(stations_of_interest))

df_with_diff = df_filtered.withColumn('temp_diff', col('tmax') - col('tmin'))

avg_diff_by_station = df_with_diff.groupBy('station').agg(avg('temp_diff').alias('avg_temp_diff'))

avg_temp_diff_dict = {row['station']: row['avg_temp_diff'] for row in avg_diff_by_station.collect()}

avg_temp_diff_dict

                                                                                

{'USW00014837': 105.62739726027397,
 'USW00014898': 102.93698630136986,
 'USW00014839': 89.6986301369863,
 'USR0000WDDG': 102.06849315068493}

23/11/21 02:55:13 WARN ChannelPool: [s0|p6-db-2/172.18.0.3:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=7354d1f0-63da-4ff7-bad1-a0ccba401920, APPLICATION_NAME=Spark-Cassandra-Connector-local-1700535141184}): failed to send request (java.nio.channels.NotYetConnectedException))


In [14]:
#q8
!nodetool status

23/11/21 02:55:28 WARN ChannelPool: [s0|p6-db-2/172.18.0.3:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=7354d1f0-63da-4ff7-bad1-a0ccba401920, APPLICATION_NAME=Spark-Cassandra-Connector-local-1700535141184}): failed to send request (java.nio.channels.NotYetConnectedException))


Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address     Load       Tokens  Owns (effective)  Host ID                               Rack 
UN  172.18.0.4  86.05 KiB  16      100.0%            9f4273a4-8316-4c6b-9033-a10c872d8058  rack1
DN  172.18.0.3  95.87 KiB  16      100.0%            3be4dc89-03dc-490e-ae7d-f9f97a2812d5  rack1
UN  172.18.0.2  86.04 KiB  16      100.0%            c76e0455-33cd-407f-b3e2-5e448f38b827  rack1



In [24]:
#q9
channel = grpc.insecure_channel('localhost:5440')
stub = station_pb2_grpc.StationStub(channel)

request = station_pb2.StationMaxRequest(station="USR0000WDDG")

try:
    response = stub.StationMax(request)
    error_content = response.error 
except grpc.RpcError as e:
    error_content = e.details()
print(f"Error field content: {error_content}")

Error field content: need 3 replicas, but only have 2


In [25]:
#q10

stub = station_pb2_grpc.StationStub(channel)

request = station_pb2.RecordTempsRequest(station='blahblahblah',date='2023-04-01',tmin=15,tmax=25)

try:
    response = stub.RecordTemps(request)
    error_message = response.error
except grpc.RpcError as rpc_error:
    error_message = rpc_error.details()

print("error: "+ error_message)

error: 
