In [1]:
import urllib.request
import zipfile
import os

In [2]:
!nodetool status

Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load        Tokens  Owns (effective)  Host ID                               Rack 
UN  192.168.144.3  113.18 KiB  16      100.0%            787e8e70-a1bb-4420-bba4-36d04cb23f41  rack1
UN  192.168.144.4  81.58 KiB   16      100.0%            1816a59e-9a47-4bdb-a23b-619e3bd689c1  rack1
UN  192.168.144.2  149.48 KiB  16      100.0%            e1443135-7528-46df-858c-5738078c7f44  rack1



In [3]:
from cassandra.cluster import Cluster
cluster = Cluster(['p6-db-1', 'p6-db-2', 'p6-db-3'])
cass = cluster.connect()

In [4]:
cass.execute("DROP KEYSPACE IF EXISTS weather")
cass.execute("""
    CREATE KEYSPACE weather
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3}
""")

cass.execute("""
    CREATE TYPE weather.station_record (
        tmin int,
        tmax int
    )
""")
cass.execute("""
    CREATE TABLE weather.stations (
    id text,
    date date,
    name text STATIC,
    record weather.station_record,
    PRIMARY KEY (id, date)
) WITH CLUSTERING ORDER BY (date ASC);
""")


<cassandra.cluster.ResultSet at 0x7fb920721ff0>

In [5]:
#q1: What is the Schema of stations?
cass.execute("""
DESCRIBE TABLE weather.stations
""").one().create_statement

"CREATE TABLE weather.stations (\n    id text,\n    date date,\n    name text static,\n    record station_record,\n    PRIMARY KEY (id, date)\n) WITH CLUSTERING ORDER BY (date ASC)\n    AND additional_write_policy = '99p'\n    AND bloom_filter_fp_chance = 0.01\n    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}\n    AND cdc = false\n    AND comment = ''\n    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}\n    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}\n    AND memtable = 'default'\n    AND crc_check_chance = 1.0\n    AND default_time_to_live = 0\n    AND extensions = {}\n    AND gc_grace_seconds = 864000\n    AND max_index_interval = 2048\n    AND memtable_flush_period_in_ms = 0\n    AND min_index_interval = 128\n    AND read_repair = 'BLOCKING'\n    AND speculative_retry = '99p';"

In [6]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("p6")
         .config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.0')
         .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
         .getOrCreate())

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-b0c3b34b-b682-4df0-b7af-94f3a33cee99;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.12;3.4.0 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.4.0 in central
	found com.datastax.oss#java-driver-core-shaded;4.13.0 in central
	found com.datastax.oss#native-protocol;1.5.0 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central
	found com.github.stephenc.jcip#jcip-annotations;1.0-1 in central
	found com.gith

In [7]:
from pyspark.sql.functions import col, expr
df = spark.read.text("ghcnd-stations.txt")
df2 = df.withColumns({'station': expr("substring(value,1,11)"), 'state': expr("substring(value,39,2)"), 'name': expr("substring(value,42,30)")})
wisco_df = df2.where(col("state") == "WI")
wisco_list = wisco_df.collect()

for row in wisco_list:
    id = row['station'].strip()
    name = row['name'].strip()
    cass.execute("""
    INSERT INTO weather.stations (id, name)
    VALUES (%s, %s)
    """, (id, name))

                                                                                

In [8]:
cass.execute("""
SELECT COUNT(*) FROM weather.stations
""").one()

Row(count=1313)

In [9]:
#q2: what is the name corresponding to station ID USW00014837?
cass.execute("""
select name FROM weather.stations WHERE id = 'USW00014837'
""").one()[0]

'MADISON DANE CO RGNL AP'

In [10]:
#q3: what is the token for the USC00470273 station
cass.execute("""
SELECT token(id), id FROM weather.stations WHERE id = 'USC00470273'
""").one()[0]

-9014250178872933741

In [11]:
#q4: what is the first vnode token in the ring following the token for USC00470273?
import subprocess
wi_quer = cass.execute("""
SELECT token(id), id FROM weather.stations WHERE id = 'USC00470273'
""").one()[0]
output = subprocess.check_output(['nodetool', 'ring']).decode('utf-8')
vnode_tokens = []
for line in output.split('\n'):
    parts = line.split()
    if len(parts) > 7:
        try:
            vnode_tokens.append(int(parts[7]))
        except ValueError:
            continue
vnode_tokens.sort()
next_token = None
for token in vnode_tokens:
    if token > wi_quer:
        next_token = token
        break

if next_token is None and vnode_tokens:
    next_token = vnode_tokens[0]
next_token

-8939171438556067363

In [12]:
from pyspark.sql import SparkSession
import zipfile
import os

# Local file paths
zip_file_path = "records.zip"
extracted_dir_path = "records.parquet"

with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    zip_ref.extractall(extracted_dir_path)

spark = SparkSession.builder.appName("WeatherData").getOrCreate()

parquet_path = os.path.join(extracted_dir_path, "records.parquet")
df = spark.read.parquet(parquet_path)

pivot_df = df.groupBy("station", "date").pivot("element").agg({"value": "first"})

pivot_df = pivot_df.selectExpr(
    "station",
    "date",
    "`tmin` as tmin",
    "`tmax` as tmax"
)

print("Rearranged DataFrame:")
pivot_df.show()

23/12/14 15:21:07 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

Rearranged DataFrame:


23/12/14 15:21:11 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 12:>                                                         (0 + 1) / 1]

+-----------+--------+------+------+
|    station|    date|  tmin|  tmax|
+-----------+--------+------+------+
|USW00014898|20220107|-166.0| -71.0|
|USW00014839|20220924| 117.0| 194.0|
|USW00014839|20220523|  83.0| 150.0|
|USW00014839|20221019|  11.0|  83.0|
|USW00014839|20220529| 139.0| 261.0|
|USR0000WDDG|20221130|-106.0| -39.0|
|USR0000WDDG|20220119|-178.0| -56.0|
|USW00014837|20220222| -88.0| -38.0|
|USR0000WDDG|20220202|-150.0|-106.0|
|USW00014839|20220427|   0.0|  39.0|
|USW00014839|20220708| 189.0| 222.0|
|USW00014839|20220917| 200.0| 294.0|
|USW00014837|20220624| 200.0| 322.0|
|USW00014898|20220129|-116.0| -60.0|
|USW00014839|20220715| 156.0| 233.0|
|USR0000WDDG|20220224|-128.0| -61.0|
|USR0000WDDG|20220130|-117.0| -33.0|
|USR0000WDDG|20220414| -17.0|  50.0|
|USW00014898|20220728| 156.0| 256.0|
|USW00014837|20220906| 117.0| 256.0|
+-----------+--------+------+------+
only showing top 20 rows



                                                                                

In [13]:
from server import *

In [14]:
# servicer_instance = StationServicer()
data_collect = pivot_df.collect()

channel = grpc.insecure_channel("localhost:5440")
stub = station_pb2_grpc.StationStub(channel)

for row in data_collect:
    request = station_pb2.RecordTempsRequest(
    station=row["station"],
    date=row["date"],
    tmin=int(row["tmin"]),
    tmax=int(row["tmax"])
)
    response = stub.RecordTemps(request)
    



In [15]:
import pandas as pd
pd.DataFrame(cass.execute("select * from weather.stations limit 10"))

Unnamed: 0,id,date,name,record
0,USC00479053,,W BEND FIRE STN #2,
1,USC00476398,,PARK FALLS DNR HQ,
2,USC00470268,,APPOLONIA,
3,USC00474110,,JUNEAU,
4,USC00475525,,MINONG 5 WSW,
5,US1WIMN0013,,TOMAH 7.5 SSW,
6,US1WIWK0039,,OCONOMOWOC 0.6 N,
7,US1WIBR0019,,RICE LAKE 5.8 N,
8,USC00472626,,EPHRAIM 1NE-WWTP,
9,USC00477174,,RIDGELAND 1NNE,


In [16]:
#q5: what is the max temperature ever seen for station USW00014837?
request = station_pb2.StationMaxRequest(station='USW00014837')
response = stub.StationMax(request)
response.tmax

356

In [17]:
stations_df = spark.read.format("org.apache.spark.sql.cassandra") \
.option("spark.cassandra.connection.host", "p6-db-1,p6-db-2,p6-db-3") \
.option("keyspace", "weather") \
.option("table", "stations") \
.load()
stations_df.createOrReplaceTempView("stations")
result = spark.sql("SELECT * FROM stations")
result.show()

+-----------+----+------+--------------------+
|         id|date|record|                name|
+-----------+----+------+--------------------+
|US1WIMW0043|null|  null|   BROWN DEER 0.8 NW|
|US1WIRC0016|null|  null|   WATERFORD 1.0 SSW|
|US1WIOG0030|null|  null|     APPLETON 2.5 NW|
|USC00470660|null|  null|          BELLEVILLE|
|US1WIDA0062|null|  null|      WAUNAKEE 4.9 W|
|USC00477182|null|  null|            RIDGEWAY|
|US1WIOG0028|null|  null| BLACK CREEK 5.6 WNW|
|US1WIWK0067|null|  null|    WAUKESHA 1.8 ENE|
|USC00476392|null|  null|    PARDEEVILLE WWTP|
|USC00474375|null|  null|   LA CROSSE WB CITY|
|USC00478190|null|  null|STOCKBRIDGE-MUNSE...|
|US1WIWK0085|null|  null|  BROOKFIELD 2.5 WNW|
|USC00474295|null|  null|           KOEPENICK|
|US1WIJF0003|null|  null|  LAKE MILLS 3.6 WNW|
|US1WIBN0008|null|  null|       DE PERE 2.4 W|
|US1WIDA0049|null|  null|      VERONA 5.5 WNW|
|US1WIMW0028|null|  null|    FRANKLIN 2.4 ENE|
|USC00470645|null|  null|     BEAVER DAM WWTP|
|US1WISB0017|

                                                                                

In [18]:
#q6: what tables/views are available in the Spark catalog?
spark.catalog.listTables()

[Table(name='stations', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [19]:
#q7: what is the average difference between tmax and tmin, for each of the four stations that have temperature records?
df_no_missing_values = spark.sql("""
    SELECT *
    FROM stations
    WHERE id IS NOT NULL AND date IS NOT NULL AND record IS NOT NULL
""")
df_no_missing_values.createOrReplaceTempView("no_na")
# df_no_na = result.na.drop()
averages = spark.sql("SELECT id, AVG(record.tmax - record.tmin) AS AVG FROM no_na GROUP BY id")
q7_dict = {}
for row in averages.collect():
    q7_dict[row['id']] = row['AVG']
q7_dict

                                                                                

{'USR0000WDDG': 102.06849315068493,
 'USW00014839': 89.6986301369863,
 'USW00014837': 105.62739726027397,
 'USW00014898': 102.93698630136986}

In [21]:
#q8: what does nodetool status output?
! nodetool status

E1214 15:22:21.167861038    1688 backup_poller.cc:127]                 Run client channel backup poller: UNKNOWN:pollset_work {created_time:"2023-12-14T15:22:21.167618096+00:00", children:[UNKNOWN:Bad file descriptor {created_time:"2023-12-14T15:22:21.167499034+00:00", errno:9, os_error:"Bad file descriptor", syscall:"epoll_wait"}]}


23/12/14 15:22:23 WARN ChannelPool: [s0|p6-db-2/192.168.144.3:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=2324e1e1-4d02-4a0b-9594-cfc367f68e32, APPLICATION_NAME=Spark-Cassandra-Connector-local-1702567241052}): failed to send request (java.nio.channels.NotYetConnectedException))


Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load        Tokens  Owns (effective)  Host ID                               Rack 
UN  192.168.144.3  225.72 KiB  16      100.0%            787e8e70-a1bb-4420-bba4-36d04cb23f41  rack1
UN  192.168.144.4  211.75 KiB  16      100.0%            1816a59e-9a47-4bdb-a23b-619e3bd689c1  rack1
UN  192.168.144.2  248.35 KiB  16      100.0%            e1443135-7528-46df-858c-5738078c7f44  rack1



23/12/14 15:22:30 WARN ChannelPool: [s0|p6-db-2/192.168.144.3:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=2324e1e1-4d02-4a0b-9594-cfc367f68e32, APPLICATION_NAME=Spark-Cassandra-Connector-local-1702567241052}): failed to send request (java.nio.channels.NotYetConnectedException))


In [22]:
max_statement = cass.prepare("SELECT MAX(record.tmax) FROM weather.stations WHERE id = ?")

max_statement.consistency_level = ConsistencyLevel.THREE

try:
    resp = cass.execute(max_statement, ('USW00014839',))
except Unavailable as e:
    error_msg = f'need {e.required_replicas} replicas, but only have {e.alive_replicas}'
    print(error_msg)

23/12/14 15:22:38 WARN ChannelPool: [s0|p6-db-2/192.168.144.3:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=2324e1e1-4d02-4a0b-9594-cfc367f68e32, APPLICATION_NAME=Spark-Cassandra-Connector-local-1702567241052}): failed to send request (java.nio.channels.NotYetConnectedException))


ReadTimeout: Error from server: code=1200 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 2 responses." info={'consistency': 'THREE', 'required_responses': 3, 'received_responses': 2}

23/12/14 15:22:50 WARN ChannelPool: [s0|p6-db-2/192.168.144.3:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=2324e1e1-4d02-4a0b-9594-cfc367f68e32, APPLICATION_NAME=Spark-Cassandra-Connector-local-1702567241052}): failed to send request (java.nio.channels.NotYetConnectedException))


In [23]:
#q9: if you make a StationMax RPC call, what does the error field contain in StationMaxReply reply?
request = station_pb2.StationMaxRequest(station='USR0000WDDG')
response = stub.StationMax(request)
response.error

'need 3 replicas, but only have 2'



In [24]:
#q10: if you make a RecordTempsRequest RPC call, what does error contain in the RecordTempsReply reply?
request = station_pb2.RecordTempsRequest(
station='USW00014839',
date='20220917',
tmin=int(-36),
tmax=int(-6)
)
response = stub.RecordTemps(request)

23/12/14 15:23:08 WARN ChannelPool: [s0|p6-db-2/192.168.144.3:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=2324e1e1-4d02-4a0b-9594-cfc367f68e32, APPLICATION_NAME=Spark-Cassandra-Connector-local-1702567241052}): failed to send request (com.datastax.oss.driver.shaded.netty.channel.StacklessClosedChannelException))
