In [None]:
!pip install --upgrade pip
!pip install --upgrade pandas
!pip install --upgrade google-api-python-client
!pip install --upgrade seaborn
!pip install --upgrade networkx
!pip install --upgrade matplotlib
!pip install --upgrade pyspark
!pip install --upgrade pyspark_dist_explore


In [None]:
import re
import ast
import time
import numpy as np
import pandas as pd
import seaborn as sns
import networkx as nx
import matplotlib.pyplot as plt
import os
from IPython.display import display, HTML, display_html #usefull to display wide tables
from pyspark_dist_explore import Histogram, hist, distplot, pandas_histogram
from pyspark.sql import functions as F, types


In [None]:
%matplotlib inline


In [None]:
spark

In [None]:
sc = spark.sparkContext

In [107]:
sc.getConf().getAll()

[('spark.eventLog.enabled', 'true'),
 ('spark.dynamicAllocation.minExecutors', '1'),
 ('spark.eventLog.dir', 'hdfs://cluster-w261-m/user/spark/eventlog'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES',
  'http://cluster-w261-m:8088/proxy/application_1543797393506_0002'),
 ('spark.yarn.am.memory', '640m'),
 ('spark.executor.cores', '4'),
 ('spark.driver.appUIAddress',
  'http://cluster-w261-m.c.w266-203603.internal:4040'),
 ('spark.executor.instances', '2'),
 ('spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS',
  'cluster-w261-m'),
 ('spark.ui.proxyBase', '/proxy/application_1543797393506_0002'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.yarn.historyServer.address', 'cluster-w261-m:18080'),
 ('spark.driver.maxResultSize', '3840m'),
 ('spark.driver.port', '34603'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.filters',
  'org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter'),
 ('sp

In [None]:
#https://blog.cloudera.com/blog/2015/03/how-to-tune-your-apache-spark-jobs-part-2/

#The number of cores can be specified with the --executor-cores flag when invoking 
#spark-submit, spark-shell, and pyspark from the command line, 
#or by setting the spark.executor.cores property in the spark-defaults.conf file 
#or on a SparkConf object.

#The cores property controls the number of concurrent tasks an executor can run. 
#--executor-cores 5 means that each executor can run a maximum of five tasks at the same time.

#The heap size can be controlled with the --executor-memory flag 
#or the spark.executor.memory property
#The memory property impacts the amount of data Spark can cache, 
#as well as the maximum sizes of the shuffle data structures used for grouping, aggregations, and joins.

#The --num-executors command-line flag or spark.executor.instances configuration property 
#control the number of executors requested. Starting in CDH 5.4/Spark 1.3, 
#you will be able to avoid setting this property by turning on dynamic allocation with the spark.dynamicAllocation.enabled property. Dynamic allocation enables a Spark application to request executors 
#when there is a backlog of pending tasks and free up executors when idle.


In [None]:
# spark.conf.set("spark.executor.memory", '19g')
# spark.conf.set('spark.executor.cores', '5')
# sc = spark.sparkContext


In [119]:
def f_calc_stats(data, column):
        return data.agg(F.avg(data[column]), F.min(data[column]), F.max(data[column]), \
                        F.stddev_pop(data[column]),F.var_pop(data[column]),F.skewness(data[column]) \
                       ).collect()

In [99]:
def f_check_null(data, column):
    return data.filter( (data[column] =="") |F.isnull(data[column])|F.isnan(data[column])
                      ).count()

In [146]:
def f_display_stats(data):
    dict1={}
    countTotal = data.count()
    for colname in [item[0] for item in data.dtypes if item[1].startswith('int')]:
        list1=f_calc_stats(data,colname)
        mean_val, min_val,max_val,stddev,var, skewness =list1[0]
        count_nulls = f_check_null(data,colname)
        dict1[colname]={}
        dict1[colname]['mean'] = str(round(mean_val,2))
        dict1[colname]['min'] = str(min_val)
        dict1[colname]['max'] = str(max_val)
        dict1[colname]['stddev'] = str(round(stddev,2))
        dict1[colname]['var'] = str(round(var,2))
        dict1[colname]['skewness'] = str(round(skewness,2))
        dict1[colname]['nulls_nans'] = str(count_nulls)
        dict1[colname]['pct_nulls_nans'] = str(round(float(count_nulls/countTotal*100),2))
    display(HTML(pd.DataFrame(dict1).T.to_html( )))

In [None]:
trainRDD = sc.textFile('gs://bucket-w261-final/data/train.txt',50)
trainRDD.cache()

In [None]:
trainRDD.count()

In [None]:
trainDF = trainRDD.map(lambda x: x.split('\t')).toDF(["clicked_0_1", "int_1", "int_2", "int_3", "int_4", "int_5", "int_6", "int_7", "int_8", "int_9", "int_10", "int_11", "int_12", "int_13", "categ_1", "categ_2", "categ_3", "categ_4", "categ_5", "categ_6", "categ_7", "categ_8", "categ_9", "categ_10", "categ_11", "categ_12", "categ_13", "categ_14", "categ_15", "categ_16", "categ_17", "categ_18", "categ_19", "categ_20", "categ_21", "categ_22", "categ_23", "categ_24", "categ_25", "categ_26"])


In [None]:
trainDF.cache()

In [None]:
trainDF = trainDF.withColumn("int_1", trainDF["int_1"].cast(types.IntegerType()))
trainDF = trainDF.withColumn("int_2", trainDF["int_2"].cast(types.IntegerType()))
trainDF = trainDF.withColumn("int_3", trainDF["int_3"].cast(types.IntegerType()))
trainDF = trainDF.withColumn("int_4", trainDF["int_4"].cast(types.IntegerType()))
trainDF = trainDF.withColumn("int_5", trainDF["int_5"].cast(types.IntegerType()))
trainDF = trainDF.withColumn("int_6", trainDF["int_6"].cast(types.IntegerType()))
trainDF = trainDF.withColumn("int_7", trainDF["int_7"].cast(types.IntegerType()))
trainDF = trainDF.withColumn("int_8", trainDF["int_8"].cast(types.IntegerType()))
trainDF = trainDF.withColumn("int_9", trainDF["int_9"].cast(types.IntegerType()))
trainDF = trainDF.withColumn("int_10", trainDF["int_10"].cast(types.IntegerType()))
trainDF = trainDF.withColumn("int_11", trainDF["int_11"].cast(types.IntegerType()))
trainDF = trainDF.withColumn("int_12", trainDF["int_12"].cast(types.IntegerType()))
trainDF = trainDF.withColumn("int_13", trainDF["int_13"].cast(types.IntegerType()))


In [None]:
# %%time
# trainDF.groupby('int_1').count().sort("count",ascending=False).show(10)


In [148]:
f_display_stats(trainDF)

Unnamed: 0,max,mean,min,nulls_nans,pct_nulls_nans,skewness,stddev,var
int_1,5775,3.5,0,20793556,45.36,27.88,9.43,88.91
int_2,257675,105.85,-3,0,0.0,13.2,391.46,153239.22
int_3,65535,26.91,0,9839447,21.46,81.49,397.97,158382.17
int_4,969,7.32,0,9937369,21.68,4.09,8.79,77.32
int_5,23159456,18538.99,0,1183117,2.58,10.1,69394.6,4815610657.56
int_6,431037,116.06,0,10252328,22.37,184.98,382.57,146357.08
int_7,56311,16.33,0,1982866,4.33,46.39,66.05,4362.57
int_8,6047,12.52,0,22773,0.05,66.16,16.69,278.52
int_9,29019,106.11,0,1982866,4.33,8.52,220.28,48524.64
int_10,11,0.62,0,20793556,45.36,1.14,0.68,0.47


### Checking stats for columns

In [None]:
# Let's look at some of the data in histograms
# def plot_hist(hist_list):
#     pd.DataFrame(
#         list(zip(*hist_list)), 
#         columns=['bin', 'frequency']
#     ).set_index(
#         'bin'
#     ).plot(kind='bar');

In [None]:
# temp_hist = trainDF.select('int_1_log').rdd.flatMap(lambda x: x).histogram(20)


In [None]:
# plot_hist(temp_hist)


In [None]:
# tempDF = trainDF.select(["int_1"]).collect()


In [None]:
# fig, axes = plt.subplots(nrows=1, ncols=1)
# fig.set_size_inches(5, 5)
# hist(axes, x=tempDF)

