In [1]:
!nodetool status

Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load        Tokens  Owns (effective)  Host ID                               Rack 
UN  192.168.192.4  242.4 KiB   16      75.3%             3f2a2574-f83c-466f-9a52-116ca6cdd389  rack1
UN  192.168.192.3  70.26 KiB   16      61.2%             4007d7ae-7d10-4672-98c2-ddbb85060db9  rack1
UN  192.168.192.2  242.41 KiB  16      63.5%             93d445fb-58a5-41b3-93f4-29298c2f480d  rack1



In [2]:
from cassandra.cluster import Cluster
cluster = Cluster(['p6-db-1', 'p6-db-2', 'p6-db-3'])
cass = cluster.connect()

In [3]:
cass.execute("drop keyspace if exists weather")
cass.execute("create keyspace weather with replication={'class': 'SimpleStrategy', 'replication_factor': 3};")
cass.execute("use weather")

<cassandra.cluster.ResultSet at 0x7fd2e19c95a0>

In [4]:
cass.execute("""
CREATE TYPE station_record (tmin INT, tmax INT)
""")

<cassandra.cluster.ResultSet at 0x7fd2e19c8d00>

In [5]:
cass.execute("drop table if exists stations")

<cassandra.cluster.ResultSet at 0x7fd2e18f2da0>

In [6]:
cass.execute("""
CREATE table stations(
    id TEXT,
    name TEXT static,
    date DATE,
    record weather.station_record,
    PRIMARY KEY (id, date)
) WITH CLUSTERING ORDER BY (date ASC)
""")
cass.execute("describe table weather.stations").one().create_statement

"CREATE TABLE weather.stations (\n    id text,\n    date date,\n    name text static,\n    record station_record,\n    PRIMARY KEY (id, date)\n) WITH CLUSTERING ORDER BY (date ASC)\n    AND additional_write_policy = '99p'\n    AND bloom_filter_fp_chance = 0.01\n    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}\n    AND cdc = false\n    AND comment = ''\n    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}\n    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}\n    AND memtable = 'default'\n    AND crc_check_chance = 1.0\n    AND default_time_to_live = 0\n    AND extensions = {}\n    AND gc_grace_seconds = 864000\n    AND max_index_interval = 2048\n    AND memtable_flush_period_in_ms = 0\n    AND min_index_interval = 128\n    AND read_repair = 'BLOCKING'\n    AND speculative_retry = '99p';"

In [7]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder
         .appName("p6")
         .config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.0')
         .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
         .getOrCreate())

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3d43a4bd-0427-475c-804a-1d9b0155351a;1.0
	confs: [default]
	found com.datastax.spark#spark-cassandra-connector_2.12;3.4.0 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.4.0 in central
	found com.datastax.oss#java-driver-core-shaded;4.13.0 in central
	found com.datastax.oss#native-protocol;1.5.0 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central
	found com.typesafe#config;1.4.1 in central
	found org.slf4j#slf4j-api;1.7.26 in central
	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central
	found com.github.stephenc.jcip#jcip-annotations;1.0-1 in central
	found com.gith

In [8]:
from pyspark.sql.functions import col, expr
df = spark.read.text("ghcnd-stations.txt")
df2 = df.withColumn("ID", expr("substring(value, 0, 11)"))\
    .withColumn("STATE", expr("substring(value, 39, 2)"))\
    .withColumn("NAME", expr("substring(value, 41, 30)"))\
    .filter(col("STATE")=='WI')
collected_df = df2.collect()

                                                                                

In [9]:
stations_insert = cass.prepare("""
INSERT INTO stations (id, name)
VALUES (?, ?)
""")

In [10]:
for row in collected_df:
    cass.execute(stations_insert, (row.ID, row.NAME))

In [11]:
cass.execute("SELECT COUNT(*) FROM weather.stations").one().count

1313

In [12]:
cass.execute("SELECT name FROM stations WHERE id = 'USW00014837'").one().name

' MADISON DANE CO RGNL AP      '

In [13]:
token_id = cass.execute("SELECT token(id) AS id_token FROM stations WHERE id = 'USC00470273'").one().id_token
token_id

-9014250178872933741

In [23]:
import subprocess
output = subprocess.check_output(["nodetool","ring"], encoding='utf-8')
vnodes = output.split("\n")

updated_vnodes = []
for node in vnodes:
    try:
        token = int(node.split()[-1])
        updated_vnodes.append(node.split())
    except:
        continue
        
for i, node in enumerate(updated_vnodes):
    token = int(node[-1])
    if i == 0:
        prev_token = int(updated_vnodes[-2][-1])
    else:
        prev_token = int(updated_vnodes[i - 1][-1])

    if (prev_token < token and prev_token < token_id <= token):
        vnode = token
    elif (prev_token > token and (token_id > prev_token or token_id <= token)):
        vnode = token
vnode

-8526915793050810996

In [16]:
from pyspark.sql.functions import col, expr, first

temp_df = (spark.read
 .format("parquet")
 .load("records.parquet")
)
temp_df = temp_df.groupBy("station", "date").pivot("element", ["TMAX", "TMIN"]).agg(first("value"))
temp_df = temp_df.withColumn("TMAX", expr("CAST(TMAX AS INT)")) \
       .withColumn("TMIN", expr("CAST(TMIN AS INT)"))
temp_df = temp_df.withColumn("date", expr("to_date(`date`, 'yyyyMMdd')"))

                                                                                

In [17]:
import grpc
import station_pb2, station_pb2_grpc

PORT = 5440
channel = grpc.insecure_channel(f'localhost:{PORT}')
stub = station_pb2_grpc.StationStub(channel)
data_df = temp_df.collect()
for row in data_df:
    stub.RecordTemps(station_pb2.RecordTempsRequest(station=row.station, date=str(row.date), tmax=row.TMAX, tmin=row.TMIN))

                                                                                

_InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.UNAVAILABLE
	details = "failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:5440: Failed to connect to remote host: Connection refused"
	debug_error_string = "UNKNOWN:failed to connect to all addresses; last error: UNKNOWN: ipv4:127.0.0.1:5440: Failed to connect to remote host: Connection refused {created_time:"2023-11-22T22:18:22.461032266+00:00", grpc_status:14}"
>

In [None]:
stub.StationMax(station_pb2.StationMaxRequest(station='USW00014837')).tmax

In [None]:
spark_df = spark.read.format("org.apache.spark.sql.cassandra")\
.option("spark.cassandra.connection.host", "p6-db-1,p6-db-2,p6-db-3")\
.option("keyspace", "weather")\
.option("table", "stations")\
.load()
spark_df.createOrReplaceTempView("stations")

In [None]:
spark.catalog.listTables()

In [None]:
average_diff_df = spark.sql("""
    SELECT id, AVG(record.tmax - record.tmin) as avg_diff
    FROM stations
    WHERE record is not null
    GROUP BY id
""")

average_diff_rows = average_diff_df.collect()

average_diff_dict = {row['id']: row['avg_diff'] for row in average_diff_rows}

average_diff_dict

In [None]:
!nodetool status

In [None]:
error = stub.StationMax(station_pb2.StationMaxRequest(station='USW00014837'))
error

In [None]:
error = stub.RecordTemps(station_pb2.RecordTempsRequest(station='USW00014837', date='2022-01-20', tmax=100, tmin=-100))
error