## Yashika Khurana
### Reservoir Sampling - Big Data

### setting up the environment

In [1]:

import os
import pyspark
conf = pyspark.SparkConf().setAppName("Reservoir Sampling Example") \
                  .set("spark.driver.memory", "8g") \
                  .set("spark.executor.memory", "8g")\
                  .set("spark.driver.maxResultSize", "2g")
conf.set('spark.ui.proxyBase', '/user/' + os.environ['JUPYTERHUB_USER'] + '/proxy/4040')
sc = pyspark.SparkContext(conf=conf) 
spark = pyspark.sql.SparkSession(sc) 
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/01 09:23:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### importing required libraries

In [2]:
import IPython
import pyspark
import glob
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import month, year, col
from pyspark.sql.functions import isnull, sum
from pyspark.sql.functions import countDistinct
import pyspark.sql.functions as F

### path to the data

In [3]:
rootpath = 'shared/midterm/drive_stats_2019_Q1/'

### reading the csv files

In [4]:
df=spark.read.csv(glob.glob(rootpath + "*.csv", recursive=True), inferSchema=True, header=True)

                                                                                

In [17]:
df.count()

                                                                                

9577046

In [5]:
from random import random

In [6]:
k = 50000 #sample size

In [7]:
accumulator = sc.accumulator(0)

In [8]:
reservoir = []

### reservoir sampling function

In [66]:
def reservoir_sampling(row):
    global reservoir, accumulator
    accumulator.add(1)
    if len(reservoir) < k:
        reservoir.append(row)
    else:
        j = int(random() * accumulator.value)
        if j < k:
            reservoir[j] = row

### applying reservoir sampling to the dataframe

In [10]:
for model in df.toLocalIterator():
    reservoir_sampling(model)
    
    

23/04/01 09:30:57 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


[Stage 33:>                                                         (0 + 1) / 1]

In [15]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType

### defining the entire schema as some types couldn't be inferred automatically while using createDataframe()

In [27]:
schema = StructType([
    StructField("date", DateType(), True),
    StructField("serial_number", StringType(), True),
    StructField("model", StringType(), True),
    StructField("capacity_bytes", StringType(), True),
    StructField("failure", IntegerType(), True),
    StructField("smart_1_normalized", StringType(), True),
    StructField("smart_1_raw", StringType(), True),
    StructField("smart_2_normalized", StringType(), True),
    StructField("smart_2_raw", StringType(), True),
    StructField("smart_3_normalized", StringType(), True),
    StructField("smart_3_raw", StringType(), True),
    StructField("smart_4_normalized", StringType(), True),
    StructField("smart_4_raw", StringType(), True),
    StructField("smart_5_normalized", StringType(), True),
    StructField("smart_5_raw", StringType(), True),
    StructField("smart_7_normalized", StringType(), True),
    StructField("smart_7_raw", StringType(), True),
    StructField("smart_8_normalized", IntegerType(), True),
    StructField("smart_8_raw", StringType(), True),
    StructField("smart_9_normalized", StringType(), True),
    StructField("smart_9_raw", StringType(), True),
    StructField("smart_10_normalized", StringType(), True),
    StructField("smart_10_raw", StringType(), True),
    StructField("smart_11_normalized", StringType(), True),
    StructField("smart_11_raw", StringType(), True),
    StructField("smart_12_normalized", StringType(), True),
    StructField("smart_12_raw", StringType(), True),
    StructField("smart_13_normalized", StringType(), True),
    StructField("smart_13_raw", StringType(), True),
    StructField("smart_15_normalized", StringType(), True),
    StructField("smart_15_raw", StringType(), True),
    StructField("smart_16_normalized", StringType(), True),
    StructField("smart_16_raw", StringType(), True),
    StructField("smart_17_normalized", StringType(), True),
    StructField("smart_17_raw", StringType(), True),
    StructField("smart_22_normalized", StringType(), True),
    StructField("smart_22_raw", StringType(), True),
    StructField("smart_23_normalized", StringType(), True),
    StructField("smart_23_raw", StringType(), True),
    StructField("smart_24_normalized", StringType(), True),
    StructField("smart_24_raw", StringType(), True),
    StructField("smart_168_normalized", StringType(), True),
    StructField("smart_168_raw", StringType(), True),
    StructField("smart_170_normalized", IntegerType(), True),
    StructField("smart_170_raw", StringType(), True),
    StructField("smart_173_normalized", StringType(), True),
    StructField("smart_173_raw", StringType(), True),
    StructField("smart_174_normalized", IntegerType(), True),
    StructField("smart_174_raw", StringType(), True),
    StructField("smart_177_normalized", StringType(), True),
    StructField("smart_177_raw", StringType(), True),
    StructField("smart_179_normalized", StringType(), True),
    StructField("smart_179_raw", StringType(), True),
    StructField("smart_181_normalized", StringType(), True),
    StructField("smart_181_raw", StringType(), True),
    StructField("smart_182_normalized", StringType(), True),
    StructField("smart_182_raw", StringType(), True),
    StructField("smart_183_normalized", IntegerType(), True),
    StructField("smart_183_raw", StringType(), True),
    StructField("smart_184_normalized", StringType(), True),
    StructField("smart_184_raw", StringType(), True),
    StructField("smart_187_normalized", StringType(), True),
    StructField("smart_187_raw", StringType(), True),
    StructField("smart_188_normalized", StringType(), True),
    StructField("smart_188_raw", StringType(), True),
    StructField("smart_189_normalized", StringType(), True),
    StructField("smart_189_raw", StringType(), True),
    StructField("smart_190_normalized", StringType(), True),
    StructField("smart_190_raw", StringType(), True),
    StructField("smart_191_normalized", StringType(), True),
    StructField("smart_191_raw", StringType(), True),
    StructField("smart_192_normalized", StringType(), True),
    StructField("smart_192_raw", StringType(), True),
    StructField("smart_193_normalized", StringType(), True),
    StructField("smart_193_raw", StringType(), True),
    StructField("smart_194_normalized", StringType(), True),
    StructField("smart_194_raw", StringType(), True),
    StructField("smart_195_normalized", StringType(), True),
    StructField("smart_195_raw", StringType(), True),
    StructField("smart_196_normalized", StringType(), True),
    StructField("smart_196_raw", StringType(), True),
    StructField("smart_197_normalized", StringType(), True),
    StructField("smart_197_raw", StringType(), True),
    StructField("smart_198_normalized", StringType(), True),
    StructField("smart_198_raw", StringType(), True),
    StructField("smart_199_normalized", StringType(), True),
    StructField("smart_199_raw", StringType(), True),
    StructField("smart_200_normalized", StringType(), True),
    StructField("smart_200_raw", StringType(), True),
    StructField("smart_201_normalized", StringType(), True),
    StructField("smart_201_raw", StringType(), True),
    StructField("smart_218_normalized", StringType(), True),
    StructField("smart_218_raw", StringType(), True),
    StructField("smart_220_normalized", StringType(), True),
    StructField("smart_220_raw", StringType(), True),
    StructField("smart_222_normalized", StringType(), True),
    StructField("smart_222_raw", StringType(), True),
    StructField("smart_223_normalized", StringType(), True),
    StructField("smart_223_raw", StringType(), True),
    StructField("smart_224_normalized", StringType(), True),
    StructField("smart_224_raw", StringType(), True),
    StructField("smart_225_normalized", StringType(), True),
    StructField("smart_225_raw", StringType(), True),
    StructField("smart_226_normalized", StringType(), True),
    StructField("smart_226_raw", StringType(), True),
    StructField("smart_231_normalized", StringType(), True),
    StructField("smart_231_raw", StringType(), True),
    StructField("smart_232_normalized", StringType(), True),
    StructField("smart_232_raw", StringType(), True),
    StructField("smart_233_normalized", StringType(), True),
    StructField("smart_233_raw", StringType(), True),
    StructField("smart_235_normalized", StringType(), True),
    StructField("smart_235_raw", StringType(), True),
    StructField("smart_240_normalized", StringType(), True),
    StructField("smart_240_raw", StringType(), True),
    StructField("smart_241_normalized", StringType(), True),
    StructField("smart_241_raw", StringType(), True),
    StructField("smart_242_normalized", StringType(), True),
    StructField("smart_242_raw", StringType(), True),
    StructField("smart_250_normalized", StringType(), True),
    StructField("smart_250_raw", StringType(), True),
    StructField("smart_251_normalized", StringType(), True),
    StructField("smart_251_raw", StringType(), True),
    StructField("smart_252_normalized", StringType(), True),
    StructField("smart_252_raw", StringType(), True),
    StructField("smart_254_normalized", StringType(), True),
    StructField("smart_254_raw", StringType(), True),
    StructField("smart_255_normalized", StringType(), True),
    StructField("smart_255_raw", StringType(), True),
])

In [28]:
#created dataframe
df2 = spark.createDataFrame(reservoir, schema=schema)

### writing the resultant dataframe to a csv file

In [29]:
df2.write.format("csv").option("header", "true").mode("overwrite").save("output.csv")

23/04/01 11:10:46 WARN TaskSetManager: Stage 34 contains a task of very large size (5539 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

### the resulting dataframe contains 50k samples as expected

In [31]:
df2.count()

23/04/01 11:12:26 WARN TaskSetManager: Stage 36 contains a task of very large size (5539 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

50000

In [30]:
df2.head(n=3)

23/04/01 11:11:53 WARN TaskSetManager: Stage 35 contains a task of very large size (5539 KiB). The maximum recommended task size is 1000 KiB.


[Row(date=datetime.date(2019, 1, 25), serial_number='PL2331LAHBUH0J', model='HGST HMS5C4040BLE640', capacity_bytes='4000787030016', failure=0, smart_1_normalized='100', smart_1_raw='0', smart_2_normalized='134', smart_2_raw='100', smart_3_normalized='100', smart_3_raw='0', smart_4_normalized='100', smart_4_raw='2', smart_5_normalized='100', smart_5_raw='0', smart_7_normalized='100', smart_7_raw='0', smart_8_normalized=113, smart_8_raw='42', smart_9_normalized='97', smart_9_raw='22915', smart_10_normalized='100', smart_10_raw='0', smart_11_normalized=None, smart_11_raw=None, smart_12_normalized='100', smart_12_raw='2', smart_13_normalized=None, smart_13_raw=None, smart_15_normalized=None, smart_15_raw=None, smart_16_normalized=None, smart_16_raw=None, smart_17_normalized=None, smart_17_raw=None, smart_22_normalized=None, smart_22_raw=None, smart_23_normalized=None, smart_23_raw=None, smart_24_normalized=None, smart_24_raw=None, smart_168_normalized=None, smart_168_raw=None, smart_170_no