# ETL Operations using SPARK 2.x

1) Change the timestamp formats to yyyy-MM-dd HH:mm:ss (remove the millisecond part)

2) Generate a new column "year_left" showing years left in passport expiration

3) Check the number of invalid mobile numbers (having 11 digits) and create a new column telling the status 'invalid" or "valid"

4) Create a new Column with Network status (BNZ or Others) based on IP address starting with 190-199

5) Find out Invalid Email address, drop those records and get a new DF 'goodata'

6) Find People having SAME fname, lname and email. Drop these Duplicate Records

7) Create a new column 'priority' saying CRITICAL if passport expire left years are less than 5

8) Creat a new Pricipal taking first 3 letters of fname, full lname and appending "@HADOOP.COM"

9) Use of important pyspark.sql.functions.lit , replace, when, otherwise, substr, fill, isnan, isnull, ltrim, rtrim, udf, upper,lower

10) Save the final data to Hive Dynamically Partitioned Table


The data file used below was generated using https://github.com/Nrsh13/Random_Data_Generator/blob/master/random_data_generator.py. The generated data file is available under data Folder in this repo. LOAD this file in the HDFS at /tmp directory

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [2]:
myspark=SparkSession.builder.appName("Spark Operations").master("yarn").enableHiveSupport().getOrCreate()

In [3]:
data = spark.read.format("csv").options(header=True, inferSchema=True, sep="\t",
    dateFormat="yyyy-MM-dd", timestampFormat="yyyy-MM-dd HH:mm:ss.ssssss", ignoreLeadingWhiteSpace=True,\
    ignoreTrailingWhiteSpace=True, path="/tmp/etl_sampledata.tsv").load()

In [4]:
data.printSchema()

root
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- principal: string (nullable = true)
 |-- emailid: string (nullable = true)
 |-- mobile: long (nullable = true)
 |-- passport_make: timestamp (nullable = true)
 |-- passport_expire: timestamp (nullable = true)
 |-- ipaddress: string (nullable = true)



In [5]:
data.count()

10000

### 1. Change the timestamp formats to yyyy-MM-dd HH:mm:ss (remove the millisecond part) 

    Functions Used: F.from_unixtime(), F.unix_timestamp(), F.round()

In [6]:
data = data.withColumn("passport_make", F.from_unixtime(F.unix_timestamp(data.passport_make, format='yyyy-MM-dd HH:mm:ss')))\
        .withColumn("passport_expire", F.from_unixtime(F.unix_timestamp(data.passport_expire, format='yyyy-MM-dd HH:mm:ss')))

### 2. Generate a new column "year_left" showing years left in passport expiration

    Functions Used: F.datediff(), F.current_timestamp(), F.round()

In [7]:
data = data.withColumn("years_left", F.round(F.datediff(data.passport_expire,F.current_timestamp())/365,2))
data.select(data.passport_make,data.passport_expire,data.years_left).show(5,False)

+-------------------+-------------------+----------+
|passport_make      |passport_expire    |years_left|
+-------------------+-------------------+----------+
|2010-10-17 20:50:19|2020-10-17 20:50:19|2.63      |
|2015-10-16 20:54:46|2025-10-16 20:54:46|7.63      |
|2013-03-18 20:55:59|2023-03-18 20:55:59|5.05      |
|2013-12-17 20:56:59|2023-12-17 20:56:59|5.8       |
|2016-01-15 20:57:53|2026-01-15 20:57:53|7.88      |
+-------------------+-------------------+----------+
only showing top 5 rows



### 3. Check the number of invalid mobile numbers (having 11 digits) and create a new column telling the status 'invalid" or "valid"

    Functions Used: F.lenght(), F.when().otherwise, F.substring(), F.groupBy()

In [8]:
data = data.withColumn("mobile_status",F.when(F.length(data.mobile) <= 10, "valid").otherwise("invalid"))\
        
data.select("mobile","mobile_status").show(5,False)

+-----------+-------------+
|mobile     |mobile_status|
+-----------+-------------+
|99840995521|invalid      |
|9863180471 |valid        |
|9801241267 |valid        |
|9898312853 |valid        |
|9846430220 |valid        |
+-----------+-------------+
only showing top 5 rows



How many invalid Mobile numbers were there?

In [9]:
data.groupby("mobile_status").count().show()

+-------------+-----+
|mobile_status|count|
+-------------+-----+
|        valid| 8000|
|      invalid| 2000|
+-------------+-----+



In [10]:
data.select("mobile").filter(F.length(data.mobile) == 11).count()

2000

Clean the Mobile Number by removing 11th digit

In [11]:
data = data.withColumn("mobile", F.substring("mobile", 1, 10))

In [12]:
data.select("mobile").filter(F.length(data.mobile) == 11).count()

0

### 4. Create a new Column with Network status (BNZ or Others) based on IP address starting with 190-199

    Functions Used: F.when().otherwise, F.substring_index(),

In [13]:
data = data.withColumn("network", \
            F.when(((F.substring_index("ipaddress", ".",1)) < 200) & ((F.substring_index("ipaddress", ".",1)) >= 190), "BNZ")\
            .otherwise("others"))
data.select("ipaddress","network").filter(data.network == "BNZ").show(5)
#data.select(F.substring_index("ipaddress", ".",1)).show()

+---------------+-------+
|      ipaddress|network|
+---------------+-------+
|  194.94.208.22|    BNZ|
| 190.163.73.208|    BNZ|
| 196.136.153.24|    BNZ|
|191.106.144.130|    BNZ|
|192.192.148.148|    BNZ|
+---------------+-------+
only showing top 5 rows



In [14]:
data.printSchema()

root
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- principal: string (nullable = true)
 |-- emailid: string (nullable = true)
 |-- mobile: string (nullable = true)
 |-- passport_make: string (nullable = true)
 |-- passport_expire: string (nullable = true)
 |-- ipaddress: string (nullable = true)
 |-- years_left: double (nullable = true)
 |-- mobile_status: string (nullable = false)
 |-- network: string (nullable = false)



### 5. Find out Invalid Email address, drop those records and get a new DF 'goodata'

This includes emails which contains: @#, @@, ##, # and no '@'

    Functions Used: F.like(), F.instr(), F.udf(), F.drop()

What is the Number of Invalid Emails?

In [15]:
data.select("emailid").filter(data.emailid.like("%@@%") \
                | data.emailid.like("%@#%") | data.emailid.like("%##%") \
                | data.emailid.like("%#%") | (F.instr(data.emailid,"@") == 0 )).count()

3127

Finding All invalid Emails using UDF

In [16]:
import re
def validate_email(email):
    mypat = r"\"?([-a-zA-Z0-9.`?{}_]+@\w+\.\w+)\"?"   
    pattern = re.compile(mypat)
    if not re.match(pattern, email):
        return False
    else:
        return True

In [17]:
validate_email_udf = F.udf(validate_email, T.StringType())

In [18]:
data = data.withColumn("email_status", validate_email_udf(data.emailid))

In [19]:
data.select("emailid","email_status").filter(data.email_status == "false").count()

3127

In [20]:
data.select("emailid","email_status").filter(data.email_status == "false").show(5,False)

+--------------------------+------------+
|emailid                   |email_status|
+--------------------------+------------+
|Andrea_Martinezhotmail.com|false       |
|Nathan_gupta@#tcs.com     |false       |
|Robert_oorehotmail.com    |false       |
|naresh_Rogers#gmail.com   |false       |
|akash_Lewis@@yahoo.com    |false       |
+--------------------------+------------+
only showing top 5 rows



We have the Data having valid Emails Addresses. Drop the temporary columns mobile_status and email_status

In [21]:
gooddata = data.filter(data.email_status == "true").drop("mobile_status").drop("email_status")

In [22]:
gooddata.count()

6873

In [23]:
gooddata.printSchema()

root
 |-- fname: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- principal: string (nullable = true)
 |-- emailid: string (nullable = true)
 |-- mobile: string (nullable = true)
 |-- passport_make: string (nullable = true)
 |-- passport_expire: string (nullable = true)
 |-- ipaddress: string (nullable = true)
 |-- years_left: double (nullable = true)
 |-- network: string (nullable = false)



### 6. Find People having SAME fname, lname and email. Drop these Duplicate Records

Will use fname, lname and emailid to decide if a record is duplicate or NOT.

    Functions Used: F.dropDuplicates(), F.collect_list(), F.size(), F.agg(), df.distinct()

Counting Distinct Records out of Total Good Records - 6873

In [24]:
#gooddata.groupBy("fname","lname","emailid").count().sort("count").show()
gooddata.createOrReplaceTempView("mytable")
spark.sql("SELECT DISTINCT (fname, lname, emailid) FROM mytable").count()

6708

Dropping Duplicates recrods using dropDuplicates()

In [25]:
gooddata = gooddata.dropDuplicates(["fname","lname", "emailid"])

Check if there are still any Duplicate records

In [26]:
gooddata.groupBy("fname","lname","emailid").agg(F.collect_list("emailid").alias("count"))\
        .where(F.size("count") > 1).count()

0

So, we left with 6708 Unique Records

In [27]:
gooddata.select("fname", "lname","emailid").distinct().count()

6708

### 7. Create a new column 'priority' saying CRITICAL if passport expire left years are less than 5

    Functions Used: F.when(), F.lit(), df.fillna() {alias for df.na.fill()}

In [28]:
gooddata = gooddata.withColumn("priority",F.when(gooddata.years_left < 3 , F.lit("CRITICAL"))).fillna("LOW")
gooddata.select("years_left","priority").show(5,False)


+----------+--------+
|years_left|priority|
+----------+--------+
|8.3       |LOW     |
|0.55      |CRITICAL|
|5.97      |LOW     |
|2.38      |CRITICAL|
|2.97      |CRITICAL|
+----------+--------+
only showing top 5 rows



### 8. Creating a new Pricipal taking first 3 letters of fname, full lname and appending "@HADOOP.COM"

    Functions Used: F.concat(), F.substring(), F.lower(), F.lit()

In [29]:
gooddata = gooddata.withColumn("principal", F.concat(F.lower(F.substring(gooddata["fname"],1,3))\
        , F.lower(gooddata["lname"]), F.lit("@HADOOP.COM")))
gooddata.select("fname","lname","principal").show(5,False)

+-------+--------+----------------------+
|fname  |lname   |principal             |
+-------+--------+----------------------+
|Frank  |Campbell|fracampbell@HADOOP.COM|
|Henry  |Howard  |henhoward@HADOOP.COM  |
|tom    |rown    |tomrown@HADOOP.COM    |
|stephen|oore    |steoore@HADOOP.COM    |
|Richard|Cooper  |riccooper@HADOOP.COM  |
+-------+--------+----------------------+
only showing top 5 rows



### 9. Use of pyspark.sql.functions like lit , replace, when, otherwise, substr, fill, isnan, isnull, ltrim, rtrim, udf, upper,lower, Window.

    All used in previouse Examples

### 10. Save the final data to Hive Dynamically Partitioned Table

#### i. Create the Table in Advance and save the data directly to HDFS location using df.write.save()

Creating a Dynamic partition Table default.etl_operations_result partitioned on "network" and "priority"

In [30]:
gooddata.createOrReplaceTempView("mytable")

myspark.conf.set("hive.exec.dynamic.partition", "true")
myspark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

In [31]:
myspark.sql("DROP TABLE IF EXISTS default.etl_operations_result")

myspark.sql("CREATE TABLE IF NOT EXISTS default.etl_operations_result( \
            fname string, lname string,`principal` string,`emailid` string, \
             `mobile` string, `passport_make` string, `passport_expire` string, `ipaddress` string, `years_left` double) \
            PARTITIONED BY (network string, priority string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' \
            LOCATION '/tmp/etl_operations_result'")

DataFrame[]

Writing the Data to "/tmp/etl_operations_result"

In [32]:
myspark.conf.set("spark.sql.shuffle.partitions",5)

gooddata.write.save(path = "/tmp/etl_operations_result", format="csv", mode="overwrite", partitionBy=("network","priority"))

In [33]:
myspark.sql("MSCK REPAIR TABLE default.etl_operations_result")
myspark.sql("SELECT COUNT(*) FROM default.etl_operations_result").show()

+--------+
|count(1)|
+--------+
|    6708|
+--------+



### OR

ii. Create the Table in Advance and load using INSERT OVERWRITE COMMAND. Make sure the order of columns is correct and partitioned Column 'network' and 'priority' is used in the end of SELECT command

In [34]:
gooddata.createOrReplaceTempView("mytable")

myspark.conf.set("hive.exec.dynamic.partition", "true")
myspark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")

In [35]:
myspark.sql("INSERT OVERWRITE TABLE default.etl_operations_result PARTITION(network, priority)\
            SELECT fname , lname ,principal ,emailid, mobile, passport_make, passport_expire,\
            ipaddress, years_left, network, priority \
            FROM mytable\
            ")

DataFrame[]

In [36]:
myspark.sql("SELECT COUNT(*) FROM default.etl_operations_result").show()

+--------+
|count(1)|
+--------+
|    6708|
+--------+



# What's Next

1) To Download this Single Notebook, go to Click this file in my Github Account, Copy the URL and paste in http://nbviewer.jupyter.org/. Download button will be in top right corner.

2) Open your Juypter Notebook home page and upload using "upload" Button.