## Sample Pyspark Code Syntax

#### A jupyter notebook for all your sample pyspark syntax

#### Initiate spark session

In [None]:
%%configure -
{
    "executorMemory": "20G",
    "numExecutors":10, 
    "executorCores":10, 
    "driverMemory": "4G", 
    "name": "SAMPLE PYSPARK APP NAME"
}

#### Basic imports

In [None]:
from pyspark.sql import SQLContext
from pyspark.sql import functions as sf
from pyspark.sql.types import BooleanType, StringType, IntegerType, DateType, StructField, StructType
from pyspark.sql.functions import col, when, concat_ws

#### Print to notebook

In [None]:
some_var = "abcd"
print("Sample line to printed:", some_var)

#### Create your desired date range here

In [None]:
path = cluster_path + all_events_path_18_19 + "2019-01-01*"

#### Read Parquet/CSV

In [None]:
columns_to_read = ["event","gaid","media_id","duration_seconds", "lr_age"]
data = spark.read.parquet(path)[columns_to_read]

#### Filter

In [None]:
data = data.filter(data.event=="mediaReady")
data = data.filter(data.duration_seconds < 84000)

#### Filter with multiple conditions

In [None]:
## OR
data = data.filter((data.gaid.isNotNull()) | (data.lr_age.isNotNull()) 

## AND                    
data = data.filter((data.duration_seconds > 0 & data.duration_seconds < 84000)                    

#### Filter by an user defined criterion (function)

In [None]:
def filter_function(filter_column_1, filter_column_2):

udf_null_filter = sf.udf(filter_function, BooleanType())
complete_media_data = complete_data.filter(udf_null_filter("event","duration_seconds"))


#### Select columns

In [None]:
data = complete_data.select(['gaid', 'media_id','duration_seconds'])

#### Distinct of a dataframe

In [None]:
distinct_data = data.distinct()

selective_distinct_data = data.dropDuplicates(['gaid','lr_age'])

#### Check the data frame strucutre alternative to str(df) in R

In [None]:
data.printSchema()

#### Rename column

In [None]:
data = data.withColumnRenamed("from_column_name","to_column_name")

#### Aggregations

In [None]:
data = data.groupBy("gaid","media_id") \
                             .agg({"duration_seconds":"sum"}) \
                             .withColumnRenamed("sum(duration_seconds)","total_wt_per_media_id")

#### Union of dataframes

In [None]:
all_data = data_part_1.union(data_part_2)

#### Create SQL table 

In [None]:
project_prefix = "001_SAMPLE_PROJECT_"
table_name = project_prefix + "SAMPLE_TABLE_NAME"

data.createOrReplaceTempView("data_temp_view") 
spark.sql("CREATE TABLE {0} AS SELECT * FROM data_temp_view".format(table_name))

#### Write to CSV

In [None]:
csv_write_path = "/tmp/YOUR_NAME_HERE/FILE_NAME_HERE"
number_of_partitions = 1

data.coalesce(number_of_partitions).write.format('com.databricks.spark.csv').save(csv_data_path)