In [1]:
!nodetool status

Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load       Tokens  Owns (effective)  Host ID                               Rack 
UN  192.168.208.3  70.27 KiB  16      68.8%             2c7ded57-e92b-451c-8477-1f2474d88528  rack1
UN  192.168.208.2  70.27 KiB  16      72.0%             c93c5e18-bc9b-4f8f-854f-d25ce3cc0edb  rack1
UN  192.168.208.4  70.28 KiB  16      59.2%             7409d49a-8643-4194-baf0-e425585d34b7  rack1



# Part 1: Station Data

In [2]:
# Connect to the Cassandra cluster using this code:
from cassandra.cluster import Cluster
cluster = Cluster(['p6-db-1', 'p6-db-2', 'p6-db-3'])
cass = cluster.connect()

In [3]:
# Then write code to do the following:

# drop a weather keyspace if it already exists
cass.execute("DROP KEYSPACE IF EXISTS weather;")

# create a weather keyspace with 3x replication
cass.execute("""
CREATE KEYSPACE weather WITH
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};
""")

<cassandra.cluster.ResultSet at 0x798ca6dd5030>

In [4]:
#Use the weather keyspace
cass.execute("use weather")

# inside weather, create a station_record type containing two ints: tmin and tmax
cass.execute("""
create type station_record(
    tmin INT,
    tmax INT
)
""")

<cassandra.cluster.ResultSet at 0x798ca7efb520>

In [5]:
#Inside weather, create a station table
#The stations table should have four columns: 
#id (text), name (text), date (date), record (weather.station_record):

# id is a partition key and corresponds to a station's ID (like 'USC00470273')
# date is a cluster key, ascending
# name is a static field (because there is only one name per ID). Example: 'UW ARBORETUM - MADISON'
# record is a regular field because there will be many records per station partition.

cass.execute("""
create table stations(
    id TEXT,
    name TEXT static,
    date DATE,
    record weather.station_record,
    PRIMARY KEY ((id), date)
) WITH CLUSTERING ORDER BY (date ASC)
""")

<cassandra.cluster.ResultSet at 0x798ca6d9ef20>

In [6]:
#q1
# What is the Schema of stations?
cass.execute("describe table weather.stations").one().create_statement

"CREATE TABLE weather.stations (\n    id text,\n    date date,\n    name text static,\n    record station_record,\n    PRIMARY KEY (id, date)\n) WITH CLUSTERING ORDER BY (date ASC)\n    AND additional_write_policy = '99p'\n    AND bloom_filter_fp_chance = 0.01\n    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}\n    AND cdc = false\n    AND comment = ''\n    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}\n    AND compression = {'chunk_length_in_kb': '16', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}\n    AND memtable = 'default'\n    AND crc_check_chance = 1.0\n    AND default_time_to_live = 0\n    AND extensions = {}\n    AND gc_grace_seconds = 864000\n    AND max_index_interval = 2048\n    AND memtable_flush_period_in_ms = 0\n    AND min_index_interval = 128\n    AND read_repair = 'BLOCKING'\n    AND speculative_retry = '99p';"

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import substring, col
import pandas as pd

spark = (SparkSession.builder
         .appName("p6")
         .config('spark.jars.packages', 'com.datastax.spark:spark-cassandra-connector_2.12:3.4.0')
         .config("spark.sql.extensions", "com.datastax.spark.connector.CassandraSparkExtensions")
         .getOrCreate())

station_data = spark.read.text("/nb/ghcnd-stations.txt")

#substring(str, pos, len)
#(end - start )+ 1
station_data = station_data.select(
    substring('value', 1, 11).alias('ID'),  
    substring('value', 39, 2).alias('STATE'),
    substring('value', 42, 30).alias('NAME')
)

# printing Dataframe schema to get the column names
#station_data.printSchema()

# visualizing the dataframe
#station_data.show(truncate=False)

wi_station_df = station_data.filter(col('STATE') == 'WI')
wi_station = wi_station_df.collect() 

for row in wi_station:
    wi_insert = cass.prepare(
            f"INSERT INTO weather.stations (id, name) VALUES (?, ?)"
        )
    cass.execute(wi_insert, (row['ID'], row['NAME']))
    
output = pd.DataFrame(cass.execute("SELECT COUNT(*) FROM weather.stations"))
output.iloc[0,0]

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.datastax.spark#spark-cassandra-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-af3d0474-546f-47a7-9723-339833108943;1.0
	confs: [default]


	found com.datastax.spark#spark-cassandra-connector_2.12;3.4.0 in central
	found com.datastax.spark#spark-cassandra-connector-driver_2.12;3.4.0 in central


	found com.datastax.oss#java-driver-core-shaded;4.13.0 in central
	found com.datastax.oss#native-protocol;1.5.0 in central
	found com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1 in central


	found com.typesafe#config;1.4.1 in central


	found org.slf4j#slf4j-api;1.7.26 in central


	found io.dropwizard.metrics#metrics-core;4.1.18 in central
	found org.hdrhistogram#HdrHistogram;2.1.12 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central


	found com.github.stephenc.jcip#jcip-annotations;1.0-1 in central
	found com.github.spotbugs#spotbugs-annotations;3.1.12 in central


	found com.google.code.findbugs#jsr305;3.0.2 in central
	found com.datastax.oss#java-driver-mapper-runtime;4.13.0 in central


	found com.datastax.oss#java-driver-query-builder;4.13.0 in central


	found org.apache.commons#commons-lang3;3.10 in central


	found com.thoughtworks.paranamer#paranamer;2.8 in central
	found org.scala-lang#scala-reflect;2.12.11 in central
downloading https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-connector_2.12/3.4.0/spark-cassandra-connector_2.12-3.4.0.jar ...


	[SUCCESSFUL ] com.datastax.spark#spark-cassandra-connector_2.12;3.4.0!spark-cassandra-connector_2.12.jar (124ms)
downloading https://repo1.maven.org/maven2/com/datastax/spark/spark-cassandra-connector-driver_2.12/3.4.0/spark-cassandra-connector-driver_2.12-3.4.0.jar ...
	[SUCCESSFUL ] com.datastax.spark#spark-cassandra-connector-driver_2.12;3.4.0!spark-cassandra-connector-driver_2.12.jar (69ms)
downloading https://repo1.maven.org/maven2/com/datastax/oss/java-driver-core-shaded/4.13.0/java-driver-core-shaded-4.13.0.jar ...


	[SUCCESSFUL ] com.datastax.oss#java-driver-core-shaded;4.13.0!java-driver-core-shaded.jar (274ms)
downloading https://repo1.maven.org/maven2/com/datastax/oss/java-driver-mapper-runtime/4.13.0/java-driver-mapper-runtime-4.13.0.jar ...
	[SUCCESSFUL ] com.datastax.oss#java-driver-mapper-runtime;4.13.0!java-driver-mapper-runtime.jar(bundle) (30ms)
downloading https://repo1.maven.org/maven2/org/apache/commons/commons-lang3/3.10/commons-lang3-3.10.jar ...
	[SUCCESSFUL ] org.apache.commons#commons-lang3;3.10!commons-lang3.jar (48ms)
downloading https://repo1.maven.org/maven2/com/thoughtworks/paranamer/paranamer/2.8/paranamer-2.8.jar ...
	[SUCCESSFUL ] com.thoughtworks.paranamer#paranamer;2.8!paranamer.jar(bundle) (26ms)
downloading https://repo1.maven.org/maven2/org/scala-lang/scala-reflect/2.12.11/scala-reflect-2.12.11.jar ...


	[SUCCESSFUL ] org.scala-lang#scala-reflect;2.12.11!scala-reflect.jar (139ms)
downloading https://repo1.maven.org/maven2/com/datastax/oss/native-protocol/1.5.0/native-protocol-1.5.0.jar ...
	[SUCCESSFUL ] com.datastax.oss#native-protocol;1.5.0!native-protocol.jar(bundle) (37ms)
downloading https://repo1.maven.org/maven2/com/datastax/oss/java-driver-shaded-guava/25.1-jre-graal-sub-1/java-driver-shaded-guava-25.1-jre-graal-sub-1.jar ...
	[SUCCESSFUL ] com.datastax.oss#java-driver-shaded-guava;25.1-jre-graal-sub-1!java-driver-shaded-guava.jar (99ms)
downloading https://repo1.maven.org/maven2/com/typesafe/config/1.4.1/config-1.4.1.jar ...
	[SUCCESSFUL ] com.typesafe#config;1.4.1!config.jar(bundle) (32ms)
downloading https://repo1.maven.org/maven2/org/slf4j/slf4j-api/1.7.26/slf4j-api-1.7.26.jar ...
	[SUCCESSFUL ] org.slf4j#slf4j-api;1.7.26!slf4j-api.jar (25ms)


downloading https://repo1.maven.org/maven2/io/dropwizard/metrics/metrics-core/4.1.18/metrics-core-4.1.18.jar ...
	[SUCCESSFUL ] io.dropwizard.metrics#metrics-core;4.1.18!metrics-core.jar(bundle) (29ms)
downloading https://repo1.maven.org/maven2/org/hdrhistogram/HdrHistogram/2.1.12/HdrHistogram-2.1.12.jar ...
	[SUCCESSFUL ] org.hdrhistogram#HdrHistogram;2.1.12!HdrHistogram.jar(bundle) (28ms)
downloading https://repo1.maven.org/maven2/org/reactivestreams/reactive-streams/1.0.3/reactive-streams-1.0.3.jar ...
	[SUCCESSFUL ] org.reactivestreams#reactive-streams;1.0.3!reactive-streams.jar (27ms)
downloading https://repo1.maven.org/maven2/com/github/stephenc/jcip/jcip-annotations/1.0-1/jcip-annotations-1.0-1.jar ...
	[SUCCESSFUL ] com.github.stephenc.jcip#jcip-annotations;1.0-1!jcip-annotations.jar (25ms)
downloading https://repo1.maven.org/maven2/com/github/spotbugs/spotbugs-annotations/3.1.12/spotbugs-annotations-3.1.12.jar ...
	[SUCCESSFUL ] com.github.spotbugs#spotbugs-annotations;3.1.12!

:: retrieving :: org.apache.spark#spark-submit-parent-af3d0474-546f-47a7-9723-339833108943
	confs: [default]
	18 artifacts copied, 0 already retrieved (18067kB/84ms)


24/04/14 17:55:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


[Stage 0:>                                                          (0 + 2) / 2]

                                                                                

1313

In [8]:
#q2
#what is the name corresponding to station ID US1WIMR0003?
output = pd.DataFrame(cass.execute("""
SELECT name
FROM weather.stations
WHERE id = 'US1WIMR0003'
"""))
output.iloc[0,0]

'AMBERG 1.3 SW                 '

In [9]:
#q3
# what is the token for the USC00470273 station?
output = pd.DataFrame(cass.execute("""
SELECT TOKEN(ID)
FROM weather.stations
WHERE id = 'USC00470273'
"""))

output.iloc[0,0]

-9014250178872933741

In [10]:
#q4

# what is the first vnode token in the ring following the token for USC00470273?
import re #To read the output using regex
from subprocess import check_output

 

result = cass.execute("""
SELECT TOKEN(ID)
FROM weather.stations
WHERE id = 'USC00470273'
""")
#Use check_output to run nodetool ring
output = check_output(["nodetool", "ring"])
tokens = re.findall(r"-?\d{19}", output.decode("utf-8")) #Use regex to find all the token numbers in the output

#Each token in tokens is a string so covert to ints with list comphrehension
tokens = [int(token) for token in tokens]

#Find the next token after the target token
#sort the tokens
tokens.sort()
target_token = result[0][0]
next_token = None
#Loop through the tokens until we find the next bigger token than the current token for USC00470273
for token in tokens:
    if token > target_token:
        next_token = token
        break #Break because we want to find just the next token thats bigger
next_token

  target_token = result[0][0]


-8744802913408059322

# Part 2 Weather Data

In [11]:
#Unzip records.zip cell
! unzip -o records.zip
records_df = spark.read.parquet("records.parquet")

#Import statements to answer q5
import grpc
import station_pb2
import station_pb2_grpc
import datetime #Using date time to convert the row.date to %Y-%m-%d format

Archive:  records.zip
  inflating: records.parquet/part-00000-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet  
 extracting: records.parquet/._SUCCESS.crc  
  inflating: records.parquet/part-00002-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet  
  inflating: records.parquet/part-00001-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet  
  inflating: records.parquet/part-00003-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet  
 extracting: records.parquet/.part-00003-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet.crc  
 extracting: records.parquet/_SUCCESS  
 extracting: records.parquet/.part-00000-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet.crc  
 extracting: records.parquet/.part-00001-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet.crc  
 extracting: records.parquet/.part-00002-574ab704-2431-4c8b-9d88-6c635a467b99-c000.snappy.parquet.crc  


[Stage 1:>                                                          (0 + 1) / 1]

                                                                                

In [12]:
record_output = records_df.groupBy("station", "date").pivot("element", ["TMIN", "TMAX"]).sum("value")

channel = grpc.insecure_channel(f"localhost:5440")
stub = station_pb2_grpc.StationStub(channel)
    
for row in record_output.collect():
    date_output = datetime.datetime.strptime(row.date, '%Y%m%d') #Convert to YYYY-MM-DD TIME
    date_output_formatted = date_output.strftime("%Y-%m-%d") #Remove the TIME
    response = stub.RecordTemps(station_pb2.RecordTempsRequest(station = row.station, date = date_output_formatted, tmin = int(row.TMIN), tmax = int(row.TMAX)))
    # error = response.error
    # print(error)


[Stage 2:>                                                          (0 + 2) / 2]

[Stage 4:>                                                          (0 + 1) / 1]

                                                                                

In [13]:
#q5
#what is the max temperature ever seen for station USW00014837?
#Request for station "USW00014837
request = station_pb2.StationMaxRequest(station = "USW00014837")

max_response= stub.StationMax(request)
max_response.tmax

356

# Part 3: Spark Analysis

In [14]:
# Create a temporary view in Spark named stations that corresponds to the stations table in Cassandra.
# Hint: you already enabled CassandraSparkExtensions when creating your Spark session, 
#so you can create a Spark DataFrame corresponding to a Cassandra table like this:
stations_df_spark = (spark.read.format("org.apache.spark.sql.cassandra")
.option("spark.cassandra.connection.host", "p6-db-1,p6-db-2,p6-db-3")
.option("keyspace", 'weather')
.option("table", 'stations')
.load())

stations_df_spark.createOrReplaceTempView("stations")

In [15]:
#q6
#what tables/views are available in the Spark catalog?
tables_list = spark.catalog.listTables()
#len(tables_list) #Only one table
tables_list

[Table(name='stations', catalog=None, namespace=[], description=None, tableType='TEMPORARY', isTemporary=True)]

In [16]:
#q7
#what is the average difference between tmax and tmin, 
#for each of the four stations that have temperature records?
from pyspark.sql.functions import col


output_df = spark.sql("""
SELECT *
FROM stations
WHERE record IS NOT NULL
""")

#Separate the record column row objects into their own columns
output_df = (output_df.withColumn('temp_min', col('record')['tmin'])
            .withColumn('temp_max', col('record')['tmax'])
            .withColumn('temp_diff', col('temp_max') - col('temp_min')))

output_df = output_df.drop("temp_min", "temp_max") # Drop the min and max columns

#Convert to pandas, group by id, calculate the mean of temp_diff, convert to dict,
average_temp_diff_dict = output_df.toPandas().groupby("id").mean("temp_diff").to_dict()['temp_diff']
average_temp_diff_dict

[Stage 10:>                                                         (0 + 2) / 6]









{'USR0000WDDG': 102.06849315068493,
 'USW00014837': 105.62739726027397,
 'USW00014839': 89.6986301369863,
 'USW00014898': 102.93698630136986}

# Part 4: Disaster Strikes

In [17]:
#Run Docker command 'docker stop p6-db-2' to kill the container

In [18]:
#q8
#what does nodetool status output?

#Use the ! COMMAND to show the output in a cell
!nodetool status

Datacenter: datacenter1
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address        Load        Tokens  Owns (effective)  Host ID                               Rack 
UN  192.168.208.3  87.75 KiB   16      100.0%            2c7ded57-e92b-451c-8477-1f2474d88528  rack1
UN  192.168.208.2  113.17 KiB  16      100.0%            c93c5e18-bc9b-4f8f-854f-d25ce3cc0edb  rack1
DN  192.168.208.4  87.76 KiB   16      100.0%            7409d49a-8643-4194-baf0-e425585d34b7  rack1



In [19]:
#q9
#if you make a StationMax RPC call, what does the error field contain in StationMaxReply reply?
request = station_pb2.StationMaxRequest(station = "USW00014837")

max_response = stub.StationMax(request)
max_response.error

'need 3 replicas, but only have 2'

In [20]:
#q10
# if you make a RecordTempsRequest RPC call, what does error contain in the RecordTempsReply reply?

#make up data
request = stub.RecordTemps(station_pb2.RecordTempsRequest(station = "UUUU1111111", date = "2022-01-01", tmin = -1000, tmax = 1000))
request.error # should be ''

''