In [2]:
import findspark
findspark.init()

In [3]:
# Create SparkSession and sparkcontext
from pyspark.sql import SparkSession
spark = SparkSession.builder\
                    .master("local")\
                    .appName('Firstprogram')\
                    .getOrCreate()
sc=spark.sparkContext

22/06/09 06:40:54 WARN Utils: Your hostname, Kaushl resolves to a loopback address: 127.0.1.1; using 172.31.15.1 instead (on interface eth0)
22/06/09 06:40:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/06/09 06:41:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# WordCount Problem

Our requirement is to write a small program to display the number of occurrence of each word in the given input file.

In [15]:
# Read text file to create RDD
text_file = sc.textFile('data/wordCountTest.txt', minPartitions=2)

In [17]:
# print number of partitions
text_file.getNumPartitions()

2

In [19]:
# Count the number of words
word_counts = text_file.flatMap(lambda line: line.split(" ")).map(lambda word: (word.lower(), 1)).reduceByKey(lambda x,y: x+y)
word_counts_sorted = word_counts.sortBy(lambda x: x[1], ascending=False)

In [23]:
# Printing each word with its respective count
output_rdd = word_counts_sorted.take(10)
for word, count in output_rdd:
    print(f"{word}: {count}")

the: 8
to: 7
in: 5
with: 5
: 4
and: 4
jupyter: 3
start: 3
pyspark: 3
our: 3


# Add Preceding Zero to Column in Spark Dataframe

Consider we have a input Spark dataframe as shown in the below figure with the couple of column Name and Score. 


Our requirement is to:
Add Leading Zero to the column Score and make it column of three digit as shown in the below output Spark dataframe. 
- for example, convert 89 to 089 and 
- convert 8 into 008

In [25]:
# Create input Spark Dataframe
list_data=[["Babu",20],["Raja",8],["Mani",75],["Kalam",100],["Zoin",7],["Kal",53]]
df1=spark.createDataFrame(list_data,["name","score"])
df1.show()

+-----+-----+
| name|score|
+-----+-----+
| Babu|   20|
| Raja|    8|
| Mani|   75|
|Kalam|  100|
| Zoin|    7|
|  Kal|   53|
+-----+-----+



#### Using format_string()

Format String basically formats the given value in the printf-style and returns the resultant value as a string output. One can import the Spark SQL function format_string and use it to add the leading zeros or for zero padding to the column in Spark dataframe.

**General Syntax**:
format_string(format,*cols)

**Parameters**:
* format (str): string of embedded format tags like %d %s used to format result column value 
* cols (column or str): Either a single column or multiple columns in Spark dataframe to be formatted
* Coming back to our use-case, let us see the code snippet to solve our problem statement.

In [26]:
from pyspark.sql.functions import format_string

In [27]:
df2 = df1.withColumn("score_000", format_string("%03d", "score")) 
df2.show()

+-----+-----+---------+
| name|score|score_000|
+-----+-----+---------+
| Babu|   20|      020|
| Raja|    8|      008|
| Mani|   75|      075|
|Kalam|  100|      100|
| Zoin|    7|      007|
|  Kal|   53|      053|
+-----+-----+---------+



#### Using lpad()

LPAD simply pads a string column to the with of the specified length.

**General Syntax**:
lpad(col,len,pad)

**Parameter**:
* col - Column or string to be padded
* len - Length of the resultant column after applying padding
* pad - character to pad (like 0 or # etc.,)

In [28]:
from pyspark.sql.functions import lpad

In [30]:
df2 = df1.withColumn("score_000", lpad("score", 3, "0"))                 # Note that we are using string 0
df2.show()

+-----+-----+---------+
| name|score|score_000|
+-----+-----+---------+
| Babu|   20|      020|
| Raja|    8|      008|
| Mani|   75|      075|
|Kalam|  100|      100|
| Zoin|    7|      007|
|  Kal|   53|      053|
+-----+-----+---------+



#### Using concat() + substr()

In [31]:
from pyspark.sql.functions import concat,substring,lit

In [37]:
df2 = df1.withColumn("score_000", concat(lit("00"), "score"))
df2.show()

+-----+-----+---------+
| name|score|score_000|
+-----+-----+---------+
| Babu|   20|     0020|
| Raja|    8|      008|
| Mani|   75|     0075|
|Kalam|  100|    00100|
| Zoin|    7|      007|
|  Kal|   53|     0053|
+-----+-----+---------+



In [41]:
df3 = df2.withColumn("score_000", substring("score_000", -3, 3))
df3.show()

+-----+-----+---------+
| name|score|score_000|
+-----+-----+---------+
| Babu|   20|      020|
| Raja|    8|      008|
| Mani|   75|      075|
|Kalam|  100|      100|
| Zoin|    7|      007|
|  Kal|   53|      053|
+-----+-----+---------+



# Replace a String in Spark DataFrame

Consider we have a dataframe with columns as shown in the below figure (Input_DF). Our requirement is to replace the string value Checking in column called Card_type to Cash. The output that we are expected to workout is shown in the below figure for your reference.

In [46]:
# Import SparkFiles to add file from url to Spark
from pyspark import SparkFiles

In [43]:
personal_transactions_csv = "https://raw.githubusercontent.com/azar-s91/dataset/master/personal_transactions.csv"

In [47]:
# If file is small then we can add online file to SparkFiles
spark.sparkContext.addFile(personal_transactions_csv)

In [50]:
# Use SparkFiles.get(file_name) to get file downlaoded file
df1 = spark.read.format("csv").option("header", "true").load(SparkFiles.get("personal_transactions.csv"))

                                                                                

In [52]:
df1.show(10)

+-----------+-------------+--------+-------------------+----------------+-------+
|Customer_No|    Card_type|    Date|           Category|Transaction Type| Amount|
+-----------+-------------+--------+-------------------+----------------+-------+
|    1000501|Platinum Card|1/1/2018|           Shopping|           debit|  11.11|
|    1000501|     Checking|1/2/2018|    Mortgage & Rent|           debit|1247.44|
|    1000501|  Silver Card|1/2/2018|        Restaurants|           debit|  24.22|
|    1000501|Platinum Card|1/3/2018|Credit Card Payment|          credit|2298.09|
|    1000501|Platinum Card|1/4/2018|      Movies & DVDs|           debit|  11.76|
|    1000501|  Silver Card|1/5/2018|        Restaurants|           debit|  25.85|
|    1000501|  Silver Card|1/6/2018|   Home Improvement|           debit|  18.45|
|    1000501|     Checking|1/8/2018|          Utilities|           debit|     45|
|    1000501|  Silver Card|1/8/2018|   Home Improvement|           debit|  15.38|
|    1000501|Pla

#### Method 1: Using `na.replace`

In [53]:
df2 = df1.na.replace("Checking", "Cash")

In [54]:
df2.show(10)

+-----------+-------------+--------+-------------------+----------------+-------+
|Customer_No|    Card_type|    Date|           Category|Transaction Type| Amount|
+-----------+-------------+--------+-------------------+----------------+-------+
|    1000501|Platinum Card|1/1/2018|           Shopping|           debit|  11.11|
|    1000501|         Cash|1/2/2018|    Mortgage & Rent|           debit|1247.44|
|    1000501|  Silver Card|1/2/2018|        Restaurants|           debit|  24.22|
|    1000501|Platinum Card|1/3/2018|Credit Card Payment|          credit|2298.09|
|    1000501|Platinum Card|1/4/2018|      Movies & DVDs|           debit|  11.76|
|    1000501|  Silver Card|1/5/2018|        Restaurants|           debit|  25.85|
|    1000501|  Silver Card|1/6/2018|   Home Improvement|           debit|  18.45|
|    1000501|         Cash|1/8/2018|          Utilities|           debit|     45|
|    1000501|  Silver Card|1/8/2018|   Home Improvement|           debit|  15.38|
|    1000501|Pla

#### Method 2: Using `regexp_replace`

In [55]:
from pyspark.sql.functions import regexp_replace

In [56]:
df2 = df1.withColumn("Card_type_repl", regexp_replace("Card_type", "Checking", "Cash"))

In [57]:
df2.show(10)

+-----------+-------------+--------+-------------------+----------------+-------+--------------+
|Customer_No|    Card_type|    Date|           Category|Transaction Type| Amount|Card_type_repl|
+-----------+-------------+--------+-------------------+----------------+-------+--------------+
|    1000501|Platinum Card|1/1/2018|           Shopping|           debit|  11.11| Platinum Card|
|    1000501|     Checking|1/2/2018|    Mortgage & Rent|           debit|1247.44|          Cash|
|    1000501|  Silver Card|1/2/2018|        Restaurants|           debit|  24.22|   Silver Card|
|    1000501|Platinum Card|1/3/2018|Credit Card Payment|          credit|2298.09| Platinum Card|
|    1000501|Platinum Card|1/4/2018|      Movies & DVDs|           debit|  11.76| Platinum Card|
|    1000501|  Silver Card|1/5/2018|        Restaurants|           debit|  25.85|   Silver Card|
|    1000501|  Silver Card|1/6/2018|   Home Improvement|           debit|  18.45|   Silver Card|
|    1000501|     Checking|1/8

#### Method 3: Using Case When

In [58]:
from pyspark.sql.functions import when,col,lit

In [59]:
df2 = df1.withColumn("Card_type_repl", when(col("Card_type").rlike("Checking"), lit("Cash")).otherwise(col("Card_type")))

In [61]:
df2.show(10)

+-----------+-------------+--------+-------------------+----------------+-------+--------------+
|Customer_No|    Card_type|    Date|           Category|Transaction Type| Amount|Card_type_repl|
+-----------+-------------+--------+-------------------+----------------+-------+--------------+
|    1000501|Platinum Card|1/1/2018|           Shopping|           debit|  11.11| Platinum Card|
|    1000501|     Checking|1/2/2018|    Mortgage & Rent|           debit|1247.44|          Cash|
|    1000501|  Silver Card|1/2/2018|        Restaurants|           debit|  24.22|   Silver Card|
|    1000501|Platinum Card|1/3/2018|Credit Card Payment|          credit|2298.09| Platinum Card|
|    1000501|Platinum Card|1/4/2018|      Movies & DVDs|           debit|  11.76| Platinum Card|
|    1000501|  Silver Card|1/5/2018|        Restaurants|           debit|  25.85|   Silver Card|
|    1000501|  Silver Card|1/6/2018|   Home Improvement|           debit|  18.45|   Silver Card|
|    1000501|     Checking|1/8

# Remove First N lines from Header

Consider we have a report of web-page traffic generated everyday which contains the analytics information such as session, pageviews, unique views etc. The sample report is shown in the figure given below. To process the data and load into Spark DataFrame, we need to remove the first 7 lines from the file, as this data is not a relevant data. Our problem statement is how will you handle this sort of files and how will you load the data into Spark DataFrame by removing first seven lines

In [63]:
from pyspark import SparkFiles

In [62]:
pageview_csv = "https://raw.githubusercontent.com/azar-s91/dataset/master/pageview.csv"

In [64]:
sc.addFile(pageview_csv)

In [77]:
# Read the file as RDD. Here we are reading with the partition as 2.
rdd1 = sc.textFile(SparkFiles.get('pageview.csv'), 2).map(lambda line: line.split(","))

In [70]:
rdd1.getNumPartitions()

2

In [72]:
rdd1.take(10)

[['Site', 'www.learntospark.com'],
 ['Desccription', '"Complete guide to learn Spark', 'AI', 'ML"'],
 ['Page Views of each blog'],
 ['20200817-20200817'],
 [''],
 ['Total data in page', '12'],
 [''],
 ['Page', 'Date', 'Pageviews', 'Unique_Pageviews', 'Sessions'],
 ['Guide to Install Spark', '2020-08-17', '1140', '986', '800'],
 ['Spark MAP vs FlatMap', '2020-08-17', '836', '800', '128']]

In [82]:
""" We apply MapPartitionWithIndex transformation to iterate through the index of partition and remove line from 0 to 7, 
if the index is equal to 0 ie. first partition of the Spark RDD."""

rdd2 = rdd1.mapPartitionsWithIndex(lambda id_x, itr: list(itr)[8:] if id_x == 0 else itr)
rdd2.take(10)

[['Guide to Install Spark', '2020-08-17', '1140', '986', '800'],
 ['Spark MAP vs FlatMap', '2020-08-17', '836', '800', '128'],
 ['Spark Architechture', '2020-08-17', '1569', '1345', '1400'],
 ['Azure Function for Mp3 to text', '2020-08-17', '350', '245', '234'],
 ['Scala Vs Python', '2020-08-17', '200', '150', '130'],
 ['Spark Window Function', '2020-08-17', '789', '546', '560'],
 ['Natural Language Processing', '2020-08-17', '467', '456', '100'],
 ['Spark Linear Interpolation - Time Series',
  '2020-08-17',
  '698',
  '345',
  '349'],
 ['Spark case statement', '2020-08-17', '234', '196', '120'],
 ['Spark Scenario Based Questions', '2020-08-17', '712', '329', '137']]

In [84]:
schema=['Page','Date','Pageviews','unique_views','session']

out_df=spark.createDataFrame(rdd2,schema)
out_df.show(10,truncate=0)

+----------------------------------------+----------+---------+------------+-------+
|Page                                    |Date      |Pageviews|unique_views|session|
+----------------------------------------+----------+---------+------------+-------+
|Guide to Install Spark                  |2020-08-17|1140     |986         |800    |
|Spark MAP vs FlatMap                    |2020-08-17|836      |800         |128    |
|Spark Architechture                     |2020-08-17|1569     |1345        |1400   |
|Azure Function for Mp3 to text          |2020-08-17|350      |245         |234    |
|Scala Vs Python                         |2020-08-17|200      |150         |130    |
|Spark Window Function                   |2020-08-17|789      |546         |560    |
|Natural Language Processing             |2020-08-17|467      |456         |100    |
|Spark Linear Interpolation - Time Series|2020-08-17|698      |345         |349    |
|Spark case statement                    |2020-08-17|234      |19

# Convert Pandas DataFrame into Spark DataFrame

#### Apache Arrow in Spark for Converting pandas df

Apache Arrow is the open source cross-platform tool, that closely works with the in-memory columnar data formats of Spark. This provides us the ability to convert data between python process and JVM in a efficient and fast way. By default Apache Arrow is disabled in Spark and can be enabled using the below line.

In [86]:
#Enable Apache Arrow
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled","true")

All Spark SQL datatype are compatible with the version of 0.10.0 of arrow and if there is any issue with the casting while conversion, then it could fall back to a non-Arrow implementation.

In [87]:
spark.conf.get("spark.sql.execution.arrow.fallback.enabled")

'true'

**Note:** Keep in mind that even if arrow is enabled, the conversion of Spark to pandas involves the Driver memory for collection of data into single node which make the memory overhead issue. So, the best approach is to take a sample before making conversion using toPandas() API.

#### Cast Issues While Conversion

We can convert the pandas df to Spark df using createDataFrame() API as shown below.

Code Syntax to create dataframe from pandas df:

`in_df=spark.createDataFrame(pandas_dataframe)`


But if there is any datatype mismatch then while creating Spark dataframe itself it throws `TypeError` stating type mismatch

##### Solution

We can convert the Pandas DF to Spark DF in two methods. 
* By casting all the columns in pandas as string using astype() 
* By defining structType() schema and using it.

##### Step 1: Reading Data

In [88]:
#import the required python package
import pandas as pd
import datetime

#Read the input csv file
in_pd=pd.read_csv('https://raw.githubusercontent.com/azar-s91/dataset/master/trans.csv')

#Display sample result from pandas df
in_pd.head()

Unnamed: 0,CustomerId,CustomerName,dateTime,Amount,discount,Member
0,1001,Arun,2020-07-15 01:01:53,2465.22,10 %,True
1,1005,Barath,2020-07-13 12:15:33,8399.34,5 %,True
2,1003,Charle,2020-07-18 20:10:45,1234.88,3 %,False
3,1004,Gokul,2020-07-15 11:11:36,1690.0,1 %,True
4,1005,Messy,2020-07-18 15:11:43,160.0,3 %,True


In [89]:
in_pd.dtypes

CustomerId        int64
CustomerName     object
dateTime         object
Amount          float64
discount         object
Member             bool
dtype: object

##### Step 2

###### Method 1 : Casting before conversion.


In [96]:
#Create spark df from pandas df using astype()
in_df=spark.createDataFrame(in_pd.astype(str))
in_df.show()

+----------+------------+-------------------+-------+--------+------+
|CustomerId|CustomerName|           dateTime| Amount|discount|Member|
+----------+------------+-------------------+-------+--------+------+
|      1001|        Arun|2020-07-15 01:01:53|2465.22|    10 %|  True|
|      1005|      Barath|2020-07-13 12:15:33|8399.34|     5 %|  True|
|      1003|      Charle|2020-07-18 20:10:45|1234.88|     3 %| False|
|      1004|       Gokul|2020-07-15 11:11:36| 1690.0|     1 %|  True|
|      1005|       Messy|2020-07-18 15:11:43|  160.0|     3 %|  True|
|      1006|      Gerold|2020-07-08 14:16:53| 2546.0|     1 %|  True|
|      1007|      Parker|2020-07-04 17:13:33| 3456.0|     2 %| False|
|      1008|        Thor|2020-07-10 03:30:43| 8745.0|     5 %|  True|
|      1009|       Steve|2020-07-22 12:10:43|  143.0|     2 %|  True|
|      1010|        Mani|2020-07-27 19:40:23| 1865.0|     3 %|  True|
|      1011|      Cooper|2020-07-13 18:10:33| 1200.0|     1 %|  True|
|      1012|       P

In [97]:
#printSchema() used to display schema
in_df.printSchema()

root
 |-- CustomerId: string (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- dateTime: string (nullable = true)
 |-- Amount: string (nullable = true)
 |-- discount: string (nullable = true)
 |-- Member: string (nullable = true)



From the result, one can observe that, we are able to make conversion from pandas df to spark df successfully. But, also notice the schema of the spark dataframe. It gives clear picture that all the column are of String datatype. If we need to perform any aggregation with numerical column in spark dataframe, then we need to again cast the column to Integer or decimal. So, the best approach would be to define the schema and try converting into spark dataframe as shown in the below step.

###### Method 2: Define Schema and Convert

In [91]:
#define the schema using StructType()
from pyspark.sql.types import *
pdsch=StructType([StructField("CustomerId",IntegerType(),True),
                  StructField("CustomerName",StringType(),True),
                  StructField("dateTime",StringType(),True),
                  StructField("Amount",FloatType(),True),
                  StructField("Discount",StringType(),True),
                  StructField("Member",BooleanType(),True),
                 ])

In [92]:
pdsch

StructType(List(StructField(CustomerId,IntegerType,true),StructField(CustomerName,StringType,true),StructField(dateTime,StringType,true),StructField(Amount,FloatType,true),StructField(Discount,StringType,true),StructField(Member,BooleanType,true)))

In [93]:
#Create dataframe using defined schema
in_df=spark.createDataFrame(in_pd,schema=pdsch)
in_df.show()

+----------+------------+-------------------+-------+--------+------+
|CustomerId|CustomerName|           dateTime| Amount|Discount|Member|
+----------+------------+-------------------+-------+--------+------+
|      1001|        Arun|2020-07-15 01:01:53|2465.22|    10 %|  true|
|      1005|      Barath|2020-07-13 12:15:33|8399.34|     5 %|  true|
|      1003|      Charle|2020-07-18 20:10:45|1234.88|     3 %| false|
|      1004|       Gokul|2020-07-15 11:11:36| 1690.0|     1 %|  true|
|      1005|       Messy|2020-07-18 15:11:43|  160.0|     3 %|  true|
|      1006|      Gerold|2020-07-08 14:16:53| 2546.0|     1 %|  true|
|      1007|      Parker|2020-07-04 17:13:33| 3456.0|     2 %| false|
|      1008|        Thor|2020-07-10 03:30:43| 8745.0|     5 %|  true|
|      1009|       Steve|2020-07-22 12:10:43|  143.0|     2 %|  true|
|      1010|        Mani|2020-07-27 19:40:23| 1865.0|     3 %|  true|
|      1011|      Cooper|2020-07-13 18:10:33| 1200.0|     1 %|  true|
|      1012|       P

In [95]:
#printSchema used to check the schema of spark df
in_df.printSchema()

root
 |-- CustomerId: integer (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- dateTime: string (nullable = true)
 |-- Amount: float (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Member: boolean (nullable = true)



# Ambiguous Column name in Spark

Consider a input Spark Dataframe as shown in the above figure, which is derived from a nested JSON file. You can download the sample dataset from this link sample dataset. We can observe that there is a duplicate column name named name, our requirement is to rename any one of the duplicate or ambiguous column from the dataframe

#### Impact with Ambiguous Column in Spark
Ambiguous column in Spark DataFrame leads to the worst impact and we will not be able to perform any transformations on top of the duplicate column as it throws the error.

#### Solution
Renaming the one of the ambiguous column name into differrent name will sort out this issue. But in Spark, we don't have a direct method to handle this use case and we need to make use of df.columns to get the duplicate columns count and index and to rename the duplicate column in Spark Dataframe.

In [None]:
#Read the input json file and flatten the data to replicate the use-case
df=spark.read.json('input1.json',multiLine=True)
df1=df.select("*","Delivery.*").drop("Delivery")
df1.show()

In [None]:
lst=[]
df_cols=df1.columns

for i in df_cols:
    if df_cols.count(i)==2:
        ind=df_cols.index(i)
        lst.append(ind)




lst1=list(set(lst))
for i in lst1:
    df_cols[i]=df_cols[i]+'_0'

df1=df1.toDF(*df_cols)
df1.show()

# Find Duplicates in Spark

Consider we have a CSV file with some duplicate records in it as shown in the picture. Our requirement is to find duplicate records or duplicate rows in spark dataframe and report the output.

**Solution**:

We can solve this problem to find duplicate rows by two Method,
* PySpark GroupBy
* PySpark Window Rank Function

In [163]:
#Read CSV File
education_csv = 'data/education.csv'
in_df=spark.read.csv(education_csv,header=True)
in_df.show()

+------+---+---------+----+
|  Name|Age|Education|Year|
+------+---+---------+----+
|   RAM| 28|       BE|2012|
|Rakesh| 53|      MBA|1985|
| Madhu| 22|    B.Com|2018|
|Rakesh| 53|      MBA|1985|
|  Bill| 32|       ME|2007|
| Madhu| 22|    B.Com|2018|
|Rakesh| 53|      MBA|1985|
|   RAM| 25|       MA|2012|
+------+---+---------+----+



#### Approach 1: GroupBy

In [164]:
in_df.groupBy("Name", "Age", "Education", "Year") \
.count() \
.where("count > 1") \
.drop("count").show()

+------+---+---------+----+
|  Name|Age|Education|Year|
+------+---+---------+----+
|Rakesh| 53|      MBA|1985|
| Madhu| 22|    B.Com|2018|
+------+---+---------+----+



#### Approach 2: Window Ranking Function

In [165]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col,row_number

In [166]:
win = Window.partitionBy("name").orderBy(col("Year").desc())

In [175]:
# List all the duplicates
in_df.withColumn("rnk", row_number().over(win)).filter("rnk > 1").show()

+------+---+---------+----+---+
|  Name|Age|Education|Year|rnk|
+------+---+---------+----+---+
| Madhu| 22|    B.Com|2018|  2|
|   RAM| 25|       MA|2012|  2|
|Rakesh| 53|      MBA|1985|  2|
|Rakesh| 53|      MBA|1985|  3|
+------+---+---------+----+---+



In [176]:
# Drop Duplicates of duplicates to have unique list of records
in_df.withColumn("rnk", row_number().over(win)).filter("rnk > 1").drop("rnk").dropDuplicates().show()

+------+---+---------+----+
|  Name|Age|Education|Year|
+------+---+---------+----+
| Madhu| 22|    B.Com|2018|
|   RAM| 25|       MA|2012|
|Rakesh| 53|      MBA|1985|
+------+---+---------+----+



In [177]:
# To have unique records based on Name column
in_df.dropDuplicates(["Name"]).show()

+------+---+---------+----+
|  Name|Age|Education|Year|
+------+---+---------+----+
|  Bill| 32|       ME|2007|
| Madhu| 22|    B.Com|2018|
|   RAM| 28|       BE|2012|
|Rakesh| 53|      MBA|1985|
+------+---+---------+----+



# Filter Data in Spark

In [181]:
# Filter records where year > 2010
in_df.filter("Year > 2010").show()

+-----+---+---------+----+
| Name|Age|Education|Year|
+-----+---+---------+----+
|  RAM| 28|       BE|2012|
|Madhu| 22|    B.Com|2018|
|Madhu| 22|    B.Com|2018|
|  RAM| 25|       MA|2012|
+-----+---+---------+----+



In [183]:
# Where is an alias for filter() method
in_df.where("Year > 2010").show()

+-----+---+---------+----+
| Name|Age|Education|Year|
+-----+---+---------+----+
|  RAM| 28|       BE|2012|
|Madhu| 22|    B.Com|2018|
|Madhu| 22|    B.Com|2018|
|  RAM| 25|       MA|2012|
+-----+---+---------+----+



# Select First Row of Each Group in Spark

In [191]:
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, first

In [189]:
win = Window.partitionBy("Name").orderBy("Name")

In [190]:
in_df.withColumn("rnk", row_number().over(win)).show()

+------+---+---------+----+---+
|  Name|Age|Education|Year|rnk|
+------+---+---------+----+---+
|  Bill| 32|       ME|2007|  1|
| Madhu| 22|    B.Com|2018|  1|
| Madhu| 22|    B.Com|2018|  2|
|   RAM| 28|       BE|2012|  1|
|   RAM| 25|       MA|2012|  2|
|Rakesh| 53|      MBA|1985|  1|
|Rakesh| 53|      MBA|1985|  2|
|Rakesh| 53|      MBA|1985|  3|
+------+---+---------+----+---+



In [197]:
in_df.withColumn("rnk", row_number().over(win)).filter("rnk < 2").show()

+------+---+---------+----+---+
|  Name|Age|Education|Year|rnk|
+------+---+---------+----+---+
|  Bill| 32|       ME|2007|  1|
| Madhu| 22|    B.Com|2018|  1|
|   RAM| 28|       BE|2012|  1|
|Rakesh| 53|      MBA|1985|  1|
+------+---+---------+----+---+



22/06/09 17:27:13 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 4176902 ms exceeds timeout 120000 ms
22/06/09 17:27:13 WARN SparkContext: Killing executors is not supported by current scheduler.


# Split and Merge Columns in Spark Dataframe

# Read MultiLine JSON in Spark

If the records in the input files are in a single line like show above, then spark.read.json will give us the expected output. If we have a single record in a multiple lines then the above command will show "_corrupt_record".

To over come this sort of corrupted issue, we need to set multiLine parameter as True while reading the JSON file. Code snippet to do so is as follows.

In [148]:
input_json = 'data/input1.json'

In [152]:
input_df = spark.read.json(input_json, multiLine=True)

In [153]:
input_df.show()

+--------------------+------+
|           Education|  name|
+--------------------+------+
|[{BE, 2018}, {ME,...|Clarke|
|        [{BE, 2015}]|  John|
+--------------------+------+



#### Explode Array Column

convert Array of strings  i.e. Education column. We explode or flattens this column using the Spark SQL Dataframe API and SQL function explode().

In [151]:
from pyspark.sql.functions import explode

In [158]:
flat = input_df.select('name', explode('Education').alias('education_flat'))

In [159]:
flat.show()

+------+--------------+
|  name|education_flat|
+------+--------------+
|Clarke|    {BE, 2018}|
|Clarke|    {ME, 2020}|
|  John|    {BE, 2015}|
+------+--------------+



In [160]:
flat.printSchema()

root
 |-- name: string (nullable = true)
 |-- education_flat: struct (nullable = true)
 |    |-- Qualification: string (nullable = true)
 |    |-- year: long (nullable = true)



#### Flatten Struct Columns

Convert the Struct data column into two different column as Qualification and year

In [161]:
out_df=flat.select('name','education_flat.Qualification', 'education_flat.year' )

In [162]:
out_df.show()

+------+-------------+----+
|  name|Qualification|year|
+------+-------------+----+
|Clarke|           BE|2018|
|Clarke|           ME|2020|
|  John|           BE|2015|
+------+-------------+----+



# Cast String Datatype to Date Timestamp in Spark

# InferSchema with StructType and StructField

Consider we create a Spark dataframe from a CSV file which is not having a header column in it. Since the file don't have header in it, the Spark dataframe will be created with the default column names named _c0, _c1 etc. This column naming convention looks awkward and will be difficult for the developers to prepare a query statement using this column names. It will be helpful if we can create a dataframe with some meaningful column name.

In [99]:
df1 = spark.read.csv('data/ecom_user.csv', header=False)

In [100]:
df1.show()

+-------+---------+---------+-------+
|    _c0|      _c1|      _c2|    _c3|
+-------+---------+---------+-------+
| Gaurav|  T-shirt|    Delhi|1234567|
|Bharath|Headphone|Bangalore|5738612|
+-------+---------+---------+-------+



#### Solution to Infer / Define Schema in PySpark

We can apply schema to the dataframe using StructType clause.

StructType clause are used to provide schema to the Spark datframe. StructType object contains list of StructField objects that defines the name, datatype and flag to indicate null-ability. We can create schema as struct type and merge this schema with the data that we have. To do this we need to import all the sql.types and have a column list with its datatype in StructField, also have to provide nullable or not details. From StructField create StructType as shown in the below code snippet.

In [101]:
from pyspark.sql.types import *

In [113]:
# Define Schema fields using StructField
data_schema=[ StructField("NAME", StringType(), True),
              StructField("Product", StringType(), True),
              StructField("City", StringType(), True),
              StructField("MOBILE", StringType(), True) ]

In [114]:
# Create Schema using StructType which taakes StructFields as input
struct_schema=StructType(fields=data_schema)
print(struct_schema)

StructType(List(StructField(NAME,StringType,true),StructField(Product,StringType,true),StructField(City,StringType,true),StructField(MOBILE,StringType,true)))


In [115]:
# Add the Schema to the DataFrame
df1 = spark.read.csv('data/ecom_user.csv', header=False, schema=struct_schema)

In [116]:
df1.show()

+-------+---------+---------+-------+
|   NAME|  Product|     City| MOBILE|
+-------+---------+---------+-------+
| Gaurav|  T-shirt|    Delhi|1234567|
|Bharath|Headphone|Bangalore|5738612|
+-------+---------+---------+-------+

