In [1]:
from plotly.offline import plot
from plotly.graph_objs import *
import numpy as np
from graphframes import *
import numpy as np
from pyspark.sql.functions import lit, col, create_map
from itertools import chain

In [2]:
# File location and type
file_location = "/FileStore/tables/one_day_group.csv"
file_type = "csv"

# CSV options
infer_schema = "True"
first_row_is_header = "True"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

In [3]:
# Create a view or table

temp_table_name = "one_day_group_csv"

df.createOrReplaceTempView(temp_table_name)

In [4]:
#df contains 4million rows
df.show(10)
df2=df.toPandas()

In [5]:
def minute_partitioner(key):
  
    return hash(key)
  
# Validate results
# num_partitions = 5
# print(minute_partitioner(3) % num_partitions)
# print(minute_partitioner(2) % num_partitions)
# print(minute_partitioner(1) % num_partitions)

In [6]:
# ALL the entries 
#df_preprocess is the dataframe without key-value pair
df_all=df2.drop(["_c0"], axis=1)
df_create=spark.createDataFrame(df_all)
df_rename = df_create.withColumnRenamed("Time_in_minute", "Latencyminute")
df_rename1=df_rename.withColumnRenamed("Time","Latency_timestamp")
df_preprocess=df_rename1.withColumn("Key", df_rename ["Latencyminute"])
df_preprocess.show(10) 

In [7]:
# creating key-value pair in the dataframes and prepare for hash partition based on key: PartitionBy()
key_dataframe = create_map(list(chain(*(
    (lit(name), col(name)) for name in df_preprocess.columns if "Latency" in name
)))).alias("Latency")

df_final=df_preprocess.select("Key", key_dataframe)
df_final.show(5)

In [8]:
# Actual data parallelize stage based on key produced by binning the seconds in the dataset

rdd_final =sc.parallelize(df_final.collect()) \
        .map(lambda df_final: (df_final['Key'], df_final)) \
        .partitionBy(12, minute_partitioner)

    
print("Number of partitions: {}".format(rdd_final.getNumPartitions()))
print("Partitioner: {}".format(rdd_final.partitioner))
print("Partitions structure: {}".format(rdd_final.glom().collect()))

In [9]:
fractions =  { 12:0.05 }
approxSample1 = rdd_final.sampleByKey(False, fractions)
print(approxSample1.take(100))

In [10]:
#return the parallelised data aboove into a dataframe
rdd2 = approxSample1.flatMapValues(lambda x : [ (k, x[k]) for k in x.keys()])
rdd2.map(lambda x : (x[0], x[1][0], x[1][1]))\
    .toDF(("key", "Latency_timestamp", "Latency"))\
    .show()

In [11]:
#It is also possible to display it on HTML page, but it is not necessary here 
p = plot(
  [
#     Histogram2dContour(x=x, y=y, contours=Contours(coloring='heatmap')),
    Scatter(x=x, y=y, mode='markers')
  ],
  output_type='div'
)

displayHTML(p)