In [1]:
import pandas as pd

In [2]:
!nodetool status

Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address     Load       Tokens  Owns (effective)  Host ID                               Rack 
UN  172.28.0.2  70.27 KiB  16      69.3%             f55dd3a2-f0f3-4843-9ac5-93d0329db7b5  rack1
UN  172.28.0.4  70.26 KiB  16      71.1%             d51e0c34-c932-41a1-a27f-36875e4a8357  rack1
UN  172.28.0.3  70.25 KiB  16      59.6%             f11b72f5-eee5-4521-adb8-73993b49ac1e  rack1



# Part 1

In [3]:
from cassandra.cluster import Cluster
cluster = Cluster(['p6-db-1', 'p6-db-2', 'p6-db-3'])
cass = cluster.connect()

### drop a weather keyspace if already exists

In [4]:
cass.execute("drop keyspace if exists weather")

<cassandra.cluster.ResultSet at 0x7fc8aedf0b80>

### create a weather keyspace with 3x replication

In [5]:
cass.execute("create keyspace weather with replication = {'class':'SimpleStrategy', 'replication_factor':3}")

<cassandra.cluster.ResultSet at 0x7fc87c09c040>

### inside weather, create a station_record type containing two ints: tmin and tmax

In [6]:
cass.execute("use weather")

<cassandra.cluster.ResultSet at 0x7fc87c0627d0>

In [7]:
cass.execute("""
create type station_record(
    tmin INT,
    tmax INT
)
""")

<cassandra.cluster.ResultSet at 0x7fc87c062ef0>

### inside weather, create a stations table

In [8]:
cass.execute("drop table if exists stations")

<cassandra.cluster.ResultSet at 0x7fc87d498d60>

In [9]:
# with help of chatbot: https://hf.co/chat/r/wMkXlBQ

cass.execute("""
CREATE TABLE stations(
    id TEXT,
    name TEXT STATIC,
    date date,
    record weather.station_record,
    PRIMARY KEY((id), date)
) WITH CLUSTERING ORDER BY (date ASC)
""")

<cassandra.cluster.ResultSet at 0x7fc87c063be0>

## Q1: What is the Schema of stations?

In [10]:
#q1
pd.DataFrame(cass.execute("describe table weather.stations"))['create_statement'][0]

"CREATE TABLE weather.stations (\n    id text,\n    date date,\n    name text static,\n    record station_record,\n    PRIMARY KEY (id, date)\n) WITH CLUSTERING ORDER BY (date ASC)\n    AND additional_write_policy = '99p'\n    AND bloom_filter_fp_chance = 0.01\n    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}\n    AND cdc = false\n    AND comment = ''\n    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}\n    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}\n    AND memtable = 'default'\n    AND crc_check_chance = 1.0\n    AND default_time_to_live = 0\n    AND extensions = {}\n    AND gc_grace_seconds = 864000\n    AND max_index_interval = 2048\n    AND memtable_flush_period_in_ms = 0\n    AND min_index_interval = 128\n    AND read_repair = 'BLOCKING'\n    AND speculative_retry = '99p';"

In [11]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("p6")
         .config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.0')
         .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
         .getOrCreate())

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4c9b4a6d-aa81-44fa-9176-36e63b5e97cc;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.12;3.4.0 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.4.0 in central
	found com.datastax.oss#java-driver-core-shaded;4.13.0 in central
	found com.datastax.oss#native-protocol;1.5.0 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central
	found com.github.stephenc.jcip#jcip-annotations;1.0-1 in central
	found com.gith

In [12]:
df = spark.read.text("ghcnd-stations.txt")

In [13]:
'''
------------------------------
Variable   Columns   Type
------------------------------
ID            1-11   Character
LATITUDE     13-20   Real
LONGITUDE    22-30   Real
ELEVATION    32-37   Real
STATE        39-40   Character
NAME         42-71   Character
GSN FLAG     73-75   Character
HCN/CRN FLAG 77-79   Character
WMO ID       81-85   Character
------------------------------
'''

'\n------------------------------\nVariable   Columns   Type\n------------------------------\nID            1-11   Character\nLATITUDE     13-20   Real\nLONGITUDE    22-30   Real\nELEVATION    32-37   Real\nSTATE        39-40   Character\nNAME         42-71   Character\nGSN FLAG     73-75   Character\nHCN/CRN FLAG 77-79   Character\nWMO ID       81-85   Character\n------------------------------\n'

In [14]:
df.limit(3).toPandas()

                                                                                

Unnamed: 0,value
0,ACW00011604 17.1167 -61.7833 10.1 ST JO...
1,ACW00011647 17.1333 -61.7833 19.2 ST JO...
2,AE000041196 25.3330 55.5170 34.0 SHARJ...


In [15]:
# with help of chatbot: https://hf.co/chat/r/2wOv6kw

from pyspark.sql.functions import col, expr

df2 = df.withColumn("ID", expr("substring(value, 0, 11)"))
df2 = df2.withColumn("state", expr("substring(value, 39, 2)"))
df2 = df2.withColumn("name", expr("substring(value, 42, 30)"))

In [16]:
df2.limit(5).toPandas()

Unnamed: 0,value,ID,state,name
0,ACW00011604 17.1167 -61.7833 10.1 ST JO...,ACW00011604,,ST JOHNS COOLIDGE FLD
1,ACW00011647 17.1333 -61.7833 19.2 ST JO...,ACW00011647,,ST JOHNS
2,AE000041196 25.3330 55.5170 34.0 SHARJ...,AE000041196,,SHARJAH INTER. AIRP
3,AEM00041194 25.2550 55.3640 10.4 DUBAI...,AEM00041194,,DUBAI INTL
4,AEM00041217 24.4330 54.6510 26.8 ABU D...,AEM00041217,,ABU DHABI INTL


In [24]:
# spark.sql("DROP TABLE IF EXISTS weather.wi_table3")

In [25]:
# df2.write.saveAsTable("wi_table3", mode="overwrite")
# wipd = spark.sql("SELECT ID, state, name FROM wi_table3 WHERE LOWER(state) = 'wi'").toPandas()
# wipd.head()

In [26]:
# spark.sql("SHOW TABLES").show()

In [27]:
# len(wipd['ID'])

In [30]:
wipd = df2.filter(df2.state == "WI").toPandas()
wipd

Unnamed: 0,value,ID,state,name
0,US1WIAD0002 43.9544 -89.8096 294.4 WI ADAMS...,US1WIAD0002,WI,ADAMS 0.4 E
1,US1WIAD0005 44.2053 -89.8480 305.7 WI NEKOO...,US1WIAD0005,WI,NEKOOSA 8.0 SSE
2,US1WIAD0006 43.8858 -89.7259 307.8 WI GRAND...,US1WIAD0006,WI,GRAND MARSH 1.0 W
3,US1WIAD0008 43.8611 -89.7163 310.0 WI GRAND...,US1WIAD0008,WI,GRAND MARSH 1.9 SSW
4,US1WIAD0010 43.7864 -89.6417 293.8 WI OXFOR...,US1WIAD0010,WI,OXFORD 4.0 W
...,...,...,...,...
1308,USW00094930 43.9333 -90.2667 280.1 WI VOLK ...,USW00094930,WI,VOLK FLD ANG
1309,USW00094940 43.9667 -90.7333 252.7 WI SPART...,USW00094940,WI,SPARTA FT MCCOY
1310,USW00094973 46.0303 -91.4425 368.8 WI HAYWA...,USW00094973,WI,HAYWARD MUNI AP
1311,USW00094985 44.6378 -90.1875 381.6 WI MARSH...,USW00094985,WI,MARSHFIELD MUNI AP


In [31]:
for i in range(len(wipd['ID'])):
    curr_id = wipd['ID'][i]
    curr_name = wipd['name'][i]
    curr_name = curr_name.replace("\'", "")
    query = f"INSERT INTO stations (id, name) VALUES ('{curr_id}', '{curr_name}')"
    cass.execute(query)

In [32]:
#q2
pd.DataFrame(cass.execute("select name from stations where id = 'USW00014837'"))['name'][0]

'MADISON DANE CO RGNL AP       '

In [33]:
#q3
u_token = pd.DataFrame(cass.execute("select token(id) from stations where id = 'USC00470273'"))['system_token_id'][0]
u_token

-9014250178872933741

In [34]:
'''
Draft:
lines = s.splitlines()
inside = False
for i in range(6, (len(lines)-6)):
    info = lines[i].split("             ")
    curr_token = int(info[1])
    if curr_token > 8886149105717214077:
        inside = True
        print(curr_token)
        break
if not inside:
    info = lines[6].split("             ")
    print(info[1])
'''

'\nDraft:\nlines = s.splitlines()\ninside = False\nfor i in range(6, (len(lines)-6)):\n    info = lines[i].split("             ")\n    curr_token = int(info[1])\n    if curr_token > 8886149105717214077:\n        inside = True\n        print(curr_token)\n        break\nif not inside:\n    info = lines[6].split("             ")\n    print(info[1])\n'

In [35]:
import subprocess
ring = subprocess.check_output("docker exec -it p6-db-1 bash | nodetool ring", shell=True).decode().splitlines()
ring

/bin/sh: 1: docker: not found


['',
 'Datacenter: datacenter1',
 'Address          Rack        Status State   Load            Owns                Token                                       ',
 '                                                                                8888436893293965475                         ',
 '172.28.0.4       rack1       Up     Normal  87.74 KiB       100.00%             -8976761085217265198                        ',
 '172.28.0.2       rack1       Up     Normal  87.75 KiB       100.00%             -8740635247803235345                        ',
 '172.28.0.3       rack1       Up     Normal  87.73 KiB       100.00%             -8423175724535584051                        ',
 '172.28.0.4       rack1       Up     Normal  87.74 KiB       100.00%             -7905260468877232385                        ',
 '172.28.0.2       rack1       Up     Normal  87.75 KiB       100.00%             -7664879366577655805                        ',
 '172.28.0.3       rack1       Up     Normal  87.73 KiB       10

In [36]:
#q4
inside = False
for i in range(5, (len(ring)-6)):
    curr_token = int(ring[i][80:])
    if curr_token > u_token:
        inside = True
        print(curr_token)
        break
if not inside:
    info = ring[5].split("             ")
    print(info[1])

-8976761085217265198


# Part 2

In [37]:
! unzip -n records.zip

Archive:  records.zip


In [38]:
! ls -la

total 10496
drwxrwxr-x 6 1001 1002     4096 Nov 21 18:19 .
drwxr-xr-x 1 root root     4096 Nov 21 18:03 ..
drwxr-xr-x 2 root root     4096 Nov 21 16:24 .ipynb_checkpoints
drwxr-xr-x 2 root root     4096 Nov 21 01:38 __pycache__
-rw-rw-r-- 1 1001 1002 10607756 Nov 19 22:23 ghcnd-stations.txt
-rw-r--r-- 1 root root    50333 Nov 21 18:19 p6.ipynb
drwxr-xr-x 2 root root     4096 Nov  5 01:54 records.parquet
-rw-rw-r-- 1 1001 1002    44610 Nov 19 22:23 records.zip
-rw-rw-r-- 1 1001 1002     3380 Nov 21 17:59 server.py
drwxr-xr-x 9 root root     4096 Nov 21 16:15 spark-warehouse
-rw-rw-r-- 1 1001 1002      510 Nov 19 22:23 station.proto
-rw-rw-r-- 1 1001 1002     1917 Nov 20 04:32 station_pb2.py
-rw-rw-r-- 1 1001 1002     3884 Nov 20 04:32 station_pb2_grpc.py


In [39]:
rec = spark.read.format("parquet").load("records.parquet")

                                                                                

In [40]:
rec.show()

+-----------+--------+-------+------+
|    station|    date|element| value|
+-----------+--------+-------+------+
|USW00014898|20220101|   TMAX| -32.0|
|USW00014898|20220102|   TMAX| -77.0|
|USW00014898|20220103|   TMAX| -60.0|
|USW00014898|20220104|   TMAX|   0.0|
|USW00014898|20220105|   TMAX| -16.0|
|USW00014898|20220106|   TMAX| -71.0|
|USW00014898|20220107|   TMAX| -71.0|
|USW00014898|20220108|   TMAX| -32.0|
|USW00014898|20220109|   TMAX| -27.0|
|USW00014898|20220110|   TMAX|-149.0|
|USW00014898|20220111|   TMAX| -16.0|
|USW00014898|20220112|   TMAX|   6.0|
|USW00014898|20220113|   TMAX|  11.0|
|USW00014898|20220114|   TMAX| -77.0|
|USW00014898|20220115|   TMAX| -99.0|
|USW00014898|20220116|   TMAX| -60.0|
|USW00014898|20220117|   TMAX| -21.0|
|USW00014898|20220118|   TMAX|  28.0|
|USW00014898|20220119|   TMAX|  28.0|
|USW00014898|20220120|   TMAX|-121.0|
+-----------+--------+-------+------+
only showing top 20 rows



                                                                                

In [41]:
rec.groupBy("station", "date").pivot("element", ["TMIN", "TMAX"]).sum("value").show()

[Stage 7:>                                                          (0 + 2) / 2]

+-----------+--------+------+------+
|    station|    date|  TMIN|  TMAX|
+-----------+--------+------+------+
|USW00014898|20220107|-166.0| -71.0|
|USW00014839|20220924| 117.0| 194.0|
|USW00014839|20220523|  83.0| 150.0|
|USW00014839|20221019|  11.0|  83.0|
|USW00014839|20220529| 139.0| 261.0|
|USR0000WDDG|20221130|-106.0| -39.0|
|USR0000WDDG|20220119|-178.0| -56.0|
|USW00014837|20220222| -88.0| -38.0|
|USR0000WDDG|20220202|-150.0|-106.0|
|USW00014839|20220427|   0.0|  39.0|
|USW00014839|20220708| 189.0| 222.0|
|USW00014839|20220917| 200.0| 294.0|
|USW00014837|20220624| 200.0| 322.0|
|USW00014898|20220129|-116.0| -60.0|
|USW00014839|20220715| 156.0| 233.0|
|USR0000WDDG|20220224|-128.0| -61.0|
|USR0000WDDG|20220130|-117.0| -33.0|
|USR0000WDDG|20220414| -17.0|  50.0|
|USW00014898|20220728| 156.0| 256.0|
|USW00014837|20220906| 117.0| 256.0|
+-----------+--------+------+------+
only showing top 20 rows



                                                                                

In [42]:
minmax = rec.groupBy("station", "date").pivot("element", ["TMIN", "TMAX"]).sum("value").toPandas()
minmax

                                                                                

Unnamed: 0,station,date,TMIN,TMAX
0,USW00014898,20220107,-166.0,-71.0
1,USW00014839,20220924,117.0,194.0
2,USW00014839,20220523,83.0,150.0
3,USW00014839,20221019,11.0,83.0
4,USW00014839,20220529,139.0,261.0
...,...,...,...,...
1455,USW00014898,20220724,167.0,278.0
1456,USW00014837,20221004,50.0,222.0
1457,USW00014837,20221107,17.0,94.0
1458,USW00014898,20221006,56.0,200.0


In [43]:
import grpc
import station_pb2, station_pb2_grpc
from concurrent import futures

channel = grpc.insecure_channel("localhost:5440")
stub = station_pb2_grpc.StationStub(channel)

In [44]:
for i in range(len(minmax['station'])):
    curr_station = minmax['station'][i]
    curr_date_raw = minmax['date'][i]
    curr_date = curr_date_raw[0:4] + "-" + curr_date_raw[4:6] + "-" + curr_date_raw[6:]
    curr_tmin = minmax['TMIN'][i]
    curr_tmax = minmax['TMAX'][i]

    rt_err = stub.RecordTemps(station_pb2.RecordTempsRequest(station = curr_station, date = curr_date, tmin = int(curr_tmin), tmax = int(curr_tmax)))

In [45]:
pd.DataFrame(cass.execute("select * from stations where id = 'USW00014837'"))

Unnamed: 0,id,date,name,record
0,USW00014837,2022-01-01,MADISON DANE CO RGNL AP,"(-99, -32)"
1,USW00014837,2022-01-02,MADISON DANE CO RGNL AP,"(-166, -82)"
2,USW00014837,2022-01-03,MADISON DANE CO RGNL AP,"(-177, -66)"
3,USW00014837,2022-01-04,MADISON DANE CO RGNL AP,"(-88, -5)"
4,USW00014837,2022-01-05,MADISON DANE CO RGNL AP,"(-116, -5)"
...,...,...,...,...
360,USW00014837,2022-12-27,MADISON DANE CO RGNL AP,"(-167, -50)"
361,USW00014837,2022-12-28,MADISON DANE CO RGNL AP,"(-50, 50)"
362,USW00014837,2022-12-29,MADISON DANE CO RGNL AP,"(44, 128)"
363,USW00014837,2022-12-30,MADISON DANE CO RGNL AP,"(-67, 117)"


In [46]:
int(pd.DataFrame(cass.execute("select record.tmax from stations where id = 'USW00014837'"))['record_tmax'].max())

356

In [47]:
#q5
resp = stub.StationMax(station_pb2.StationMaxRequest(station = "USW00014837"))
print(resp.tmax)
# print(resp.error)

356


# Part 3

In [48]:
(spark.read.format("org.apache.spark.sql.cassandra")
.option("spark.cassandra.connection.host", "p6-db-1,p6-db-2,p6-db-3")
.option("keyspace", "weather")
.option("table", "stations")
.load().createOrReplaceTempView("stations"))

In [49]:
#q6
spark.catalog.listTables()

[Table(name='stations', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [50]:
rec.groupBy("station", "date").pivot("element", ["TMIN", "TMAX"]).sum("value").show()

+-----------+--------+------+------+
|    station|    date|  TMIN|  TMAX|
+-----------+--------+------+------+
|USW00014898|20220107|-166.0| -71.0|
|USW00014839|20220924| 117.0| 194.0|
|USW00014839|20220523|  83.0| 150.0|
|USW00014839|20221019|  11.0|  83.0|
|USW00014839|20220529| 139.0| 261.0|
|USR0000WDDG|20221130|-106.0| -39.0|
|USR0000WDDG|20220119|-178.0| -56.0|
|USW00014837|20220222| -88.0| -38.0|
|USR0000WDDG|20220202|-150.0|-106.0|
|USW00014839|20220427|   0.0|  39.0|
|USW00014839|20220708| 189.0| 222.0|
|USW00014839|20220917| 200.0| 294.0|
|USW00014837|20220624| 200.0| 322.0|
|USW00014898|20220129|-116.0| -60.0|
|USW00014839|20220715| 156.0| 233.0|
|USR0000WDDG|20220224|-128.0| -61.0|
|USR0000WDDG|20220130|-117.0| -33.0|
|USR0000WDDG|20220414| -17.0|  50.0|
|USW00014898|20220728| 156.0| 256.0|
|USW00014837|20220906| 117.0| 256.0|
+-----------+--------+------+------+
only showing top 20 rows



In [51]:
rec2 = rec.groupBy("station", "date").pivot("element", ["TMIN", "TMAX"]).sum("value")
rec2 = rec2.withColumn("diff", rec2.TMAX - rec2.TMIN)
rec2.show()

+-----------+--------+------+------+-----+
|    station|    date|  TMIN|  TMAX| diff|
+-----------+--------+------+------+-----+
|USW00014898|20220107|-166.0| -71.0| 95.0|
|USW00014839|20220924| 117.0| 194.0| 77.0|
|USW00014839|20220523|  83.0| 150.0| 67.0|
|USW00014839|20221019|  11.0|  83.0| 72.0|
|USW00014839|20220529| 139.0| 261.0|122.0|
|USR0000WDDG|20221130|-106.0| -39.0| 67.0|
|USR0000WDDG|20220119|-178.0| -56.0|122.0|
|USW00014837|20220222| -88.0| -38.0| 50.0|
|USR0000WDDG|20220202|-150.0|-106.0| 44.0|
|USW00014839|20220427|   0.0|  39.0| 39.0|
|USW00014839|20220708| 189.0| 222.0| 33.0|
|USW00014839|20220917| 200.0| 294.0| 94.0|
|USW00014837|20220624| 200.0| 322.0|122.0|
|USW00014898|20220129|-116.0| -60.0| 56.0|
|USW00014839|20220715| 156.0| 233.0| 77.0|
|USR0000WDDG|20220224|-128.0| -61.0| 67.0|
|USR0000WDDG|20220130|-117.0| -33.0| 84.0|
|USR0000WDDG|20220414| -17.0|  50.0| 67.0|
|USW00014898|20220728| 156.0| 256.0|100.0|
|USW00014837|20220906| 117.0| 256.0|139.0|
+----------

In [52]:
station1 = rec2.filter(rec2.station == 'USW00014839')
station1.show()

+-----------+--------+------+-----+-----+
|    station|    date|  TMIN| TMAX| diff|
+-----------+--------+------+-----+-----+
|USW00014839|20220924| 117.0|194.0| 77.0|
|USW00014839|20220523|  83.0|150.0| 67.0|
|USW00014839|20221019|  11.0| 83.0| 72.0|
|USW00014839|20220529| 139.0|261.0|122.0|
|USW00014839|20220427|   0.0| 39.0| 39.0|
|USW00014839|20220708| 189.0|222.0| 33.0|
|USW00014839|20220917| 200.0|294.0| 94.0|
|USW00014839|20220715| 156.0|233.0| 77.0|
|USW00014839|20221202|   6.0|111.0|105.0|
|USW00014839|20221211|  28.0| 56.0| 28.0|
|USW00014839|20220420|  44.0|106.0| 62.0|
|USW00014839|20220116| -99.0|-21.0| 78.0|
|USW00014839|20220422|  56.0| 78.0| 22.0|
|USW00014839|20220504|  39.0|106.0| 67.0|
|USW00014839|20221228| -39.0| 83.0|122.0|
|USW00014839|20220615| 244.0|350.0|106.0|
|USW00014839|20221129|  17.0|150.0|133.0|
|USW00014839|20221001|  94.0|189.0| 95.0|
|USW00014839|20221111|   6.0|178.0|172.0|
|USW00014839|20220109|-116.0|  0.0|116.0|
+-----------+--------+------+-----

In [53]:
from pyspark.sql.functions import avg

In [54]:
avg1 = station1.select(avg(station1.diff)).toPandas()['avg(diff)'][0]
avg1

89.6986301369863

In [55]:
station2 = rec2.filter(rec2.station == 'USR0000WDDG')
avg2 = station2.select(avg(station1.diff)).toPandas()['avg(diff)'][0]
avg2

102.06849315068493

In [56]:
station3 = rec2.filter(rec2.station == 'USW00014837')
avg3 = station3.select(avg(station1.diff)).toPandas()['avg(diff)'][0]
avg3

105.62739726027397

In [57]:
station4 = rec2.filter(rec2.station == 'USW00014898')
avg4 = station4.select(avg(station1.diff)).toPandas()['avg(diff)'][0]
avg4

102.93698630136986

In [58]:
#q7
dict = {'USW00014839':avg1, 'USR0000WDDG':avg2, 'USW00014837':avg3, 'USW00014898':avg4}
dict

{'USW00014839': 89.6986301369863,
 'USR0000WDDG': 102.06849315068493,
 'USW00014837': 105.62739726027397,
 'USW00014898': 102.93698630136986}

# Part 4

In [62]:
#q8
! nodetool status

23/11/21 18:22:25 WARN ChannelPool: [s0|p6-db-2/172.28.0.2:9042]  Error while opening new channel (ConnectionInitException: [s0|connecting...] Protocol initialization request, step 1 (STARTUP {CQL_VERSION=3.0.0, DRIVER_NAME=DataStax Java driver for Apache Cassandra(R), DRIVER_VERSION=4.13.0, CLIENT_ID=a8eabff7-7506-4917-a1be-43c27e53b24f, APPLICATION_NAME=Spark-Cassandra-Connector-local-1700589986518}): failed to send request (java.nio.channels.NotYetConnectedException))


Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address     Load       Tokens  Owns (effective)  Host ID                               Rack 
DN  172.28.0.2  87.75 KiB  16      100.0%            f55dd3a2-f0f3-4843-9ac5-93d0329db7b5  rack1
UN  172.28.0.4  87.74 KiB  16      100.0%            d51e0c34-c932-41a1-a27f-36875e4a8357  rack1
UN  172.28.0.3  87.73 KiB  16      100.0%            f11b72f5-eee5-4521-adb8-73993b49ac1e  rack1



In [66]:
#q9
resp = stub.StationMax(station_pb2.StationMaxRequest(station = "USW00014837"))
resp.error

'need 3 replicas, but only have 2'

In [67]:
#q10
rt_err = stub.RecordTemps(station_pb2.RecordTempsRequest(station = "USW00014837", date = "2023-02-02", tmin = int(0), tmax = int(10)))
rt_err.error

''