# Profiling Big Data in distributed environment using Spark: A Pyspark Data Primer
This notebook accompanies the post published at (url).

In [2]:
import findspark
findspark.init()   # makes pyspark importable as a regular library
#Get a Spark session
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("shaheen_dp") \
    .config("spark.master","yarn") \
    .config("spark.submit.deployMode","client")\
    .config("spark.yarn.queue", "yourqueue") \
    .config("spark.executor.instances","50",) \
    .enableHiveSupport()\
    .getOrCreate()    

In [4]:
spark

<pyspark.sql.session.SparkSession at 0x7fae3bfd36d0>

In [6]:
import platform, sys, os
print('Platform = ',platform.platform())  
print('Version of Spark = ',spark.version)
print('Python version = ',sys.version)

('Platform = ', 'Linux-2.6.32-754.14.2.el6.x86_64-x86_64-with-redhat-6.10-Santiago')
('Version of Spark = ', u'2.1.1.2.6.2.0-205')
('Python version = ', '2.7.13 |Continuum Analytics, Inc.| (default, Dec 20 2016, 23:09:15) \n[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)]')


# Load the hive table to be profiled as a pyspark dataframe

In [7]:
schema_name = 'some_transaction_schema'
table_name = 'some_pos_table'  
query = 'select * from ' + schema_name + '.' + table_name 
print('hive query -> ' + query)
df = spark.sql(query)
print('The hive table ' + table_name +' is loaded as a ',type(df))

hive query -> select * from some_transaction_schema.some_pos_table
('The hive table some_pos_table is loaded as a ', <class 'pyspark.sql.dataframe.DataFrame'>)


# Data Profile Function

In [33]:
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import isnan, when, count, col

def dataprofile(data_all_df,data_cols):
    data_df = data_all_df.select(data_cols)
    columns2Bprofiled = data_df.columns
    global schema_name, table_name
    if not 'schema_name' in globals():
        schema_name = 'schema_name'
    if not 'table_name' in globals():
        table_name = 'table_name' 
    dprof_df = pd.DataFrame({'schema_name':[schema_name] * len(data_df.columns),\
                             'table_name':[table_name] * len(data_df.columns),\
                             'column_names':data_df.columns,\
                             'data_types':[x[1] for x in data_df.dtypes]}) 
    dprof_df = dprof_df[['schema_name','table_name','column_names', 'data_types']]
    dprof_df.set_index('column_names', inplace=True, drop=False)
    # ======================
    num_rows = data_df.count()
    dprof_df['num_rows'] = num_rows
    # ======================    
    # number of rows with nulls and nans   
    df_nacounts = data_df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data_df.columns \
                                  if data_df.select(c).dtypes[0][1]!='timestamp']).toPandas().transpose()
    df_nacounts = df_nacounts.reset_index()  
    df_nacounts.columns = ['column_names','num_null']
    dprof_df = pd.merge(dprof_df, df_nacounts, on = ['column_names'], how = 'left')
    # ========================
    # number of rows with white spaces (one or more space) or blanks
    num_spaces = [data_df.where(F.col(c).rlike('^\\s+$')).count() for c in data_df.columns]
    dprof_df['num_spaces'] = num_spaces
    num_blank = [data_df.where(F.col(c)=='').count() for c in data_df.columns]
    dprof_df['num_blank'] = num_blank
    # =========================
    # using the in built describe() function 
    desc_df = data_df.describe().toPandas().transpose()
    desc_df.columns = ['count', 'mean', 'stddev', 'min', 'max']
    desc_df = desc_df.iloc[1:,:]  
    desc_df = desc_df.reset_index()  
    desc_df.columns.values[0] = 'column_names'  
    desc_df = desc_df[['column_names','count', 'mean', 'stddev']] 
    dprof_df = pd.merge(dprof_df, desc_df , on = ['column_names'], how = 'left')
    # ===========================================
    allminvalues = [data_df.select(F.min(x)).limit(1).toPandas().iloc[0][0] for x in columns2Bprofiled]
    allmaxvalues = [data_df.select(F.max(x)).limit(1).toPandas().iloc[0][0] for x in columns2Bprofiled]
    allmincounts = [data_df.where(col(x) == y).count() for x,y in zip(columns2Bprofiled, allminvalues)]
    allmaxcounts = [data_df.where(col(x) == y).count() for x,y in zip(columns2Bprofiled, allmaxvalues)]    
    df_counts = dprof_df[['column_names']]
    df_counts.insert(loc=0, column='min', value=allminvalues)
    df_counts.insert(loc=0, column='counts_min', value=allmincounts)
    df_counts.insert(loc=0, column='max', value=allmaxvalues)
    df_counts.insert(loc=0, column='counts_max', value=allmaxcounts)
    df_counts = df_counts[['column_names','min','counts_min','max','counts_max']]
    dprof_df = pd.merge(dprof_df, df_counts , on = ['column_names'], how = 'left')  
    # ==========================================
    # number of distinct values in each column
    dprof_df['num_distinct'] = [data_df.select(x).distinct().count() for x in columns2Bprofiled]
    # ============================================
    # most frequently occuring value in a column and its count
    dprof_df['most_freq_valwcount'] = [data_df.groupBy(x).count().sort("count",ascending=False).limit(1).\
                                       toPandas().iloc[0].values.tolist() for x in columns2Bprofiled]
    dprof_df['most_freq_value'] = [x[0] for x in dprof_df['most_freq_valwcount']]
    dprof_df['most_freq_value_count'] = [x[1] for x in dprof_df['most_freq_valwcount']]
    dprof_df = dprof_df.drop(['most_freq_valwcount'],axis=1)
    # least frequently occuring value in a column and its count
    dprof_df['least_freq_valwcount'] = [data_df.groupBy(x).count().sort("count",ascending=True).limit(1).\
                                        toPandas().iloc[0].values.tolist() for x in columns2Bprofiled]
    dprof_df['least_freq_value'] = [x[0] for x in dprof_df['least_freq_valwcount']]
    dprof_df['least_freq_value_count'] = [x[1] for x in dprof_df['least_freq_valwcount']]
    dprof_df = dprof_df.drop(['least_freq_valwcount'],axis=1)

    return dprof_df

In [None]:
# Driver code for the data profle function
import time
start = time.time()
cols2profile = df.columns  # select all or some columns from the table
dprofile = dataprofile(df, cols2profile)
end = time.time()
print('Time taken to execute dataprofile function ', (end - start)/60,' minutes')

In [None]:
import pandas as pd
dprofile = pd.read_csv('profile_ncpdp_acv_txn2_v3.csv')

In [12]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [11]:
dprofile

Unnamed: 0,schema_name,table_name,column_names,data_types,num_rows,num_null,num_spaces,num_blank,count,mean,stddev,min,counts_min,max,counts_max,num_distinct,most_freq_value,most_freq_value_count,least_freq_value,least_freq_value_count
0,some_transaction_schema,some_pos_table,pos_Col1,timestamp,543358805,,0,0,,,,1/1/2018 0:00,3241693,3/25/2018 0:00,3741132,90,2/5/2018 0:00,8254090,2/4/2018 0:00,3223683
1,some_transaction_schema,some_pos_table,pos_Col2,string,543358805,88470.0,69,0,543270335.0,1.1,1.4,,1659,D,13,11,H,537773592,8,3
2,some_transaction_schema,some_pos_table,pos_Col3,int,543358805,180552292.0,0,0,362806513.0,1387804.0,770134.0,1,8,5487485,1,4796157,,180552292,4474845,1
3,some_transaction_schema,some_pos_table,pos_Col4,string,543358805,88441.0,40439856,0,543270364.0,40607890.0,421806000.0,,40439856,YZZ3333686,328,1836,BEW,96038409,JRQ,1
4,some_transaction_schema,some_pos_table,pos_Col5,smallint,543358805,27.0,0,0,543358778.0,2.1,2.3,0,9442382,218,1,220,1,288931621,215,1
5,some_transaction_schema,some_pos_table,pos_Col6,bigint,543358805,442471.0,0,0,542916334.0,5619366000.0,3651901000.0,3,9,11240178403,3,50425488,,442471,9954535204,1
6,some_transaction_schema,some_pos_table,pos_Col7,int,543358805,88441.0,0,0,543270364.0,240855.4,294403.0,1553,1678060,900020,236945,447,4336,144268567,8985,2
7,some_transaction_schema,some_pos_table,pos_Col8,string,543358805,543358799.0,0,0,6.0,,,M,6,M,6,2,,543358799,M,6
8,some_transaction_schema,some_pos_table,pos_Col9,bigint,543358805,494029537.0,0,0,49329268.0,28645970.0,38663250.0,0,1,228907306,2,1255706,,494029537,24011199,1
9,some_transaction_schema,some_pos_table,pos_Col10,string,543358805,0.0,0,0,543358805.0,,,B0,543358805,B0,543358805,1,B0,543358805,B0,543358805


In [14]:
file_out = table_name + '.csv'
file_out = os.path.join(os.getcwd(), file_out)
dprofile.to_csv(file_out, index=False)
print('Data profile written to ',file_out)

('Data profile written to ', 'C:/Users/shaheen/Documents/Spark/Pyspark/some_pos_table.csv')


# Profiling the New York City Taxi Trip Duration data downloaded from [kaggle](https://www.kaggle.com/c/nyc-taxi-trip-duration/data)

In [None]:
df_taxi = spark.read.csv('nytaxi.csv', inferSchema=True, header=True)

In [27]:
df_taxi.count(), len(df_taxi.columns)

(1458644, 11)

In [35]:
import time
start = time.time()
cols2profile = df_taxi.columns
dprofile = dataprofile(df_taxi,cols2profile)
end = time.time()
print('Time taken to execute dataprofile function ', (end - start)/60,' minutes')

('Time taken to execute dataprofile function ', 0.6450139999389648, ' minutes')


In [18]:
dprofile

Unnamed: 0,schema_name,table_name,column_names,data_types,num_rows,num_null,num_spaces,num_blank,count,mean,stddev,min,counts_min,max,counts_max,num_distinct,most_freq_value,most_freq_value_count,least_freq_value,least_freq_value_count
0,schema_name,table_name,id,string,1458644,0.0,0,0,1458644.0,,,id0000001,1,id4000000,1,1458644,id3064780,1.0,id2136901,1.0
1,schema_name,table_name,vendor_id,int,1458644,0.0,0,0,1458644.0,1.53495,0.498777,1,678342,2,780302,2,2,780302.0,1,678342.0
2,schema_name,table_name,pickup_datetime,timestamp,1458644,,0,0,,,,2016-01-01 00:00:17,1,2016-06-30 23:59:39,1,1380222,2016-06-10 23:17:17,5.0,2016-05-28 21:21:48,1.0
3,schema_name,table_name,dropoff_datetime,timestamp,1458644,,0,0,,,,2016-01-01 00:03:31,1,2016-07-01 23:02:03,1,1380377,2016-02-19 19:25:04,5.0,2016-06-27 19:42:56,1.0
4,schema_name,table_name,passenger_count,int,1458644,0.0,0,0,1458644.0,1.66453,1.314242,0,60,9,1,10,1,1033540.0,8,1.0
5,schema_name,table_name,pickup_longitude,double,1458644,0.0,0,0,1458644.0,-73.973486,0.070902,-121.93334197998047,1,-61.33552932739258,1,23047,-73.9822006225586,633.0,-73.82072448730469,1.0
6,schema_name,table_name,pickup_latitude,double,1458644,0.0,0,0,1458644.0,40.750921,0.032881,34.35969543457031,1,51.88108444213867,1,45245,40.77410125732422,414.0,40.66127395629883,1.0
7,schema_name,table_name,dropoff_longitude,double,1458644,0.0,0,0,1458644.0,-73.973416,0.070643,-121.93330383300781,1,-61.33552932739258,1,33821,-73.98233032226562,443.0,-73.82982635498047,1.0
8,schema_name,table_name,dropoff_latitude,double,1458644,0.0,0,0,1458644.0,40.7518,0.035891,32.1811408996582,1,43.92102813720703,1,62519,40.77431106567383,269.0,40.61756134033203,1.0
9,schema_name,table_name,store_and_fwd_flag,string,1458644,0.0,0,0,1458644.0,,,N,1450599,Y,8045,2,N,1450599.0,Y,8045.0


In [13]:
file_out = "profile_taxi.csv"
file_out = os.path.join(os.getcwd() + file_out)
dprofile.to_csv(file_out, index=False)
print('Data profile written to ',file_out)

('Data profile written to ', 'C:/Users/shaheen/Documents/Spark/Pyspark/profile_taxi.csv')


In [None]:
spark.stop()