DATA ENGINEER - PYTHON PYSPARK

# Big Data Analysis on Real world Dataset using pySpark

In [1]:
# Intialising Spark session for this notebook
#importing reqd modules from pyspark
import pyspark
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
sc = SparkContext('local')
spark = SparkSession(sc)
sqlContext = SQLContext(sc)

In [2]:
import pandas as pd
import json 
from pandas.io.json import json_normalize 

In [3]:
# Arrow consumption like Scala
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

In [4]:
#import CSVs here
salesteam_pd = pd.read_csv("./sales_teams.csv")
salespipeline_pd = pd.read_csv("./sales_pipeline.csv")
products_pd = pd.read_csv("./products.csv")
accounts_pd = pd.read_csv("./accounts.csv")
clicks_pd = pd.read_csv("./clicks.csv")

#import JSONs here
orchestra_json = pd.read_json("./Orchestra.json")



In [5]:
salesteam_df = sqlContext.createDataFrame(salesteam_pd)
salespipeline_df = sqlContext.createDataFrame(salespipeline_pd)
products_df = sqlContext.createDataFrame(products_pd)
accounts_df =sqlContext.createDataFrame(accounts_pd)
clicks_df = sqlContext.createDataFrame(clicks_pd)


## Inspecting the dataframe created

In [6]:


salespipeline_df.show(5)
salesteam_df.show(5)
products_df.show()
clicks_df.show(5)
accounts_df.show(5)

+----------+--------------+---------------+-----------+--------+----------+-----------+----------+
|   account|opportunity_id|    sales_agent| deal_stage| product|close_date|close_value|created_on|
+----------+--------------+---------------+-----------+--------+----------+-----------+----------+
|Sunnamplex|      67HY0MW7|  Donn Cantrell|        Won|GTXBasic|2017-05-06|      500.0|2017-04-24|
|      null|      MA82HVCI| James Ascencio|In_Progress|  GTXPro|      null|       null|2017-06-15|
|      null|      BRL1KVVH| Vicki Laflamme|       Lost|GTXBasic|2017-08-03|        0.0|2017-05-19|
|     Silis|      R22O68FF|Niesha Huffines|        Won|GTXBasic|2017-06-27|      524.0|2017-03-21|
|     Silis|      J78AK31N|  Kami Bicknell|        Won|  MGRPFU|2017-08-04|     4794.0|2017-05-15|
+----------+--------------+---------------+-----------+--------+----------+-----------+----------+
only showing top 5 rows

+---------------+-------------+---------------+
|    sales_agent|      manager|regio

Refer & Use five CSVs to answer problem 1-10 below

PROBLEM 1: Display 'Manager' and 'Grand Total Sales', for sales done by the sales agents reporting these managers

In [7]:
#solution code here
#joining salesteam and salespipeline on index as sales_agent
salesteam_df.\
join(salespipeline_df,"sales_agent").\
filter("deal_stage=='Won'").\
select("sales_agent","manager","close_value").\
groupBy("manager").sum("close_value").\
show()


+----------------+----------------+
|         manager|sum(close_value)|
+----------------+----------------+
|    Celia Rouche|       2518466.0|
|   Rocco Neubert|       3346813.0|
|   Melvin Marxen|       4265901.0|
|   Summer Sewald|       2915362.0|
|      Cara Losch|       1861751.0|
|Dustin Brinkmann|       3028635.0|
+----------------+----------------+



In [8]:
salesteam_pd.head()

Unnamed: 0,sales_agent,manager,regional_office
0,Donn Cantrell,Rocco Neubert,Central
1,James Ascencio,Summer Sewald,West
2,Vicki Laflamme,Celia Rouche,West
3,Niesha Huffines,Melvin Marxen,East
4,Kami Bicknell,Summer Sewald,West


In [9]:
salespipeline_pd.head()

Unnamed: 0,account,opportunity_id,sales_agent,deal_stage,product,close_date,close_value,created_on
0,Sunnamplex,67HY0MW7,Donn Cantrell,Won,GTXBasic,2017-05-06,500.0,2017-04-24
1,,MA82HVCI,James Ascencio,In_Progress,GTXPro,,,2017-06-15
2,,BRL1KVVH,Vicki Laflamme,Lost,GTXBasic,2017-08-03,0.0,2017-05-19
3,Silis,R22O68FF,Niesha Huffines,Won,GTXBasic,2017-06-27,524.0,2017-03-21
4,Silis,J78AK31N,Kami Bicknell,Won,MGRPFU,2017-08-04,4794.0,2017-05-15


PROBLEM 2: Display 'Sales Agents' and 'Sales' for those sales where product sold at profit

In [10]:
#solution code here

salespipeline_df.\
join(products_df,"product").\
filter("close_value > sales_price").\
select("sales_agent","product").\
show(5)

+-----------------+------------+
|      sales_agent|     product|
+-----------------+------------+
|    Kami Bicknell|GTXPlusBasic|
|Marty Freudenburg|GTXPlusBasic|
| Gladys Colclough|GTXPlusBasic|
|   Wilburn Farren|GTXPlusBasic|
|  Kary Hendrixson|GTXPlusBasic|
+-----------------+------------+
only showing top 5 rows



PROBLEM 3: Display the 'Opportunity ID' and 'Days Taken to Close', for opportunities those got closed within a month

In [11]:
#solution code here
salespipeline_df.\
select("opportunity_id",datediff("close_date","created_on").alias("Days Taken to close")).\
filter(datediff("close_date","created_on")<30).\
show()


+--------------+-------------------+
|opportunity_id|Days Taken to close|
+--------------+-------------------+
|      67HY0MW7|                 12|
|      4VHUTHOJ|                  1|
|      TMJ0OJ0B|                 10|
|      B22V5Z3B|                 13|
|      T8QRTV6F|                 10|
|      H7ZQUWDJ|                  9|
|      KJ1JOOQ0|                 13|
|      88MXDSGR|                  9|
|      4RE1ST7V|                 22|
|      4FJHFAH7|                  6|
|      H0NRZ2VX|                 22|
|      VB2E4FRU|                  8|
|      FLXHSKT4|                 13|
|      FJVFOQPG|                 12|
|      WHRDPR4H|                 12|
|      LWZPACKS|                  7|
|      377G0K33|                  2|
|      IF0LPAQA|                 15|
|      NEJZ68R1|                  5|
|      ZQLLEUES|                 13|
+--------------+-------------------+
only showing top 20 rows



PROBLEM 4: Display product(s) got maximum leads (by count) generated from paid source

In [12]:
#solution code here
clicks_df.join(salespipeline_df,"created_on").\
filter("source=='Paid'").\
select("product").\
groupBy("product").count().\
orderBy(desc("count")).\
show()

+------------+-----+
|     product|count|
+------------+-----+
|    GTXBasic|71644|
|GTXPlusBasic|66316|
|      GTXPro|55036|
|      MGRPFS|53306|
|      MGRPFU|46500|
|  GTXPlusPro|37029|
|     GTK500U| 1409|
+------------+-----+



PROBLEM 5: Display 'Sales Agent' and 'Opportunity Count', for those sales agents who lost atleast two opportunties

In [13]:
#solution code here
salespipeline_df.select("deal_stage","sales_agent","opportunity_id").\
filter("deal_stage=='Lost'").\
groupBy("sales_agent").agg(countDistinct("opportunity_id")).\
show()



+------------------+---------------------+
|       sales_agent|count(opportunity_id)|
+------------------+---------------------+
|   Darcel Schlecht|                  337|
|     Kami Bicknell|                  134|
|    Vicki Laflamme|                  162|
|      Elease Gluck|                   62|
|Jonathan Berthelot|                  185|
|   Daniell Hammack|                   80|
|     Anna Snelling|                  140|
|      Cassey Cress|                  137|
|     Garret Kinder|                   63|
|    Markita Hansen|                  115|
|      Reed Clapper|                   87|
|Rosie Papadopoulos|                   56|
|   Maureen Marcano|                  119|
|  Violet Mclelland|                  111|
|  Gladys Colclough|                  149|
|         Boris Faz|                   63|
|    Wilburn Farren|                   44|
| Versie Hillebrand|                  118|
| Marty Freudenburg|                  120|
|    Cecily Lampkin|                   86|
+----------

PROBLEM 6: Display in ascending order of revenue, 'Account' and 'Revenue' for telecom accounts 

In [14]:
#solution code here
clicks_df.\
filter("industry=='Telecom'").\
join(salespipeline_df,"created_on").\
join(accounts_df,"account").\
select("account","revenue").\
distinct().\
orderBy("revenue").\
show()


+--------------------+-------+
|             account|revenue|
+--------------------+-------+
|          Stanredtax|  14.79|
|          Fasehatice|   19.2|
|            Kan-code|  22.63|
|           Treequote|   73.1|
|           Konmatfix|  82.96|
|Olivia Pope & Ass...|  97.94|
|         Donquadtech| 110.88|
|           Warephase| 130.62|
|        Soylent Corp| 136.89|
|         Iselectrics| 138.63|
|              Yearin| 144.68|
|           Ganjaflex|  161.8|
|     Sterling Cooper| 204.47|
|            Rangreen| 211.12|
|            Xx-zobam| 221.86|
|              Hatfan| 223.54|
|            Betatech| 239.22|
|           Duff Beer| 244.32|
|         Good Burger| 247.91|
|            Blackzim| 256.32|
+--------------------+-------+
only showing top 20 rows



PROBLEM 7: Display by revenue generated, bottom five 'Industries' and 'Revenue'

In [15]:
#solution code here
clicks_df.\
join(salespipeline_df,"created_on").\
join(accounts_df,"account").\
select("industry","revenue").\
distinct().\
orderBy(desc("revenue")).\
limit(5).\
show()


+--------------------+-------+
|            industry|revenue|
+--------------------+-------+
|         Health Care| 6085.6|
|           Education| 6085.6|
|                  IT| 6085.6|
|                SaaS| 6085.6|
|Retail/Entertainment| 6085.6|
+--------------------+-------+



PROBLEM 8: Display 'Month of Year' vs 'Sales', for GTXBasic. NOTE: "Month of Year" means month year (eg. Jan) and "Month" means month (eg. Jan 2020, Jan 2021 etc)

In [16]:
#Can use this UDF but, I am getting javasocket timeout

#def convertMonth(number):
#    monthdict = {1:"Jan",2:"Feb",3:"Mar",4:"Apr",5:"May",6:"June",7:"Jul",8:"Aug",9:"Sept",10:"Oct",11:"Nov",12:"Dec"}
#    if monthdict.has_key(number):
#        return monthdict[number]

    
#convertMonthUDF = udf(lambda z: convertCase(z))

In [17]:
#solution code here

salespipeline_df.\
filter("product=='GTXBasic'").\
select(month("close_date").alias("Month Of Year"),"product").\
groupBy("Month Of Year").agg(count("Month Of Year").alias("Sales")).\
orderBy("Month Of Year").show()

+-------------+-----+
|Month Of Year|Sales|
+-------------+-----+
|         null|    0|
|            3|   63|
|            4|  107|
|            5|  154|
|            6|  188|
|            7|  243|
|            8|  313|
|            9|  378|
|           10|  265|
|           11|  123|
|           12|  307|
+-------------+-----+



PROBLEM 9: Which sales agent(s) never lost a deal. Display as a dictionary {sales agent:sales}

In [18]:
#solution code here



PROBLEM 10: Display 'Sales Agents', 'Product', and 'Sales', for those sales agents who closed more than one deal on same day

In [19]:
#solution code here
salespipeline_df.\
join(products_df,"product").\
select("sales_agent","product").\
filter(datediff("close_date","created_on")==0).\
show()

+-----------------+------------+
|      sales_agent|     product|
+-----------------+------------+
|  Kary Hendrixson|GTXPlusBasic|
|  Niesha Huffines|GTXPlusBasic|
|Versie Hillebrand|GTXPlusBasic|
|Marty Freudenburg|    GTXBasic|
|  Darcel Schlecht|    GTXBasic|
|     Cassey Cress|      GTXPro|
|    Hayden Neloms|      MGRPFU|
|  Daniell Hammack|      MGRPFS|
|        Zane Levy|      MGRPFS|
+-----------------+------------+



Refer & Use Orchestra.json to answer problem 11-13 below

## Extracting data from json to DF for spark

In [21]:
with open("./Orchestra.json",encoding='utf-8') as file:
    data = json.load(file)
    
json_df = json_normalize(data['programs'])
json_df.head()



  json_df = json_normalize(data['programs'])


Unnamed: 0,id,programID,orchestra,season,concerts,works
0,00646b9f-fec7-4ffb-9fb1-faae410bd9dc-0.1,3853,New York Philharmonic,1842-43,"[{'eventType': 'Subscription Season', 'Locatio...","[{'ID': '52446*', 'composerName': 'Beethoven, ..."
1,1118e84e-eb59-46cc-9119-d903375e65e6-0.1,5178,New York Philharmonic,1842-43,"[{'eventType': 'Subscription Season', 'Locatio...","[{'ID': '52437*', 'composerName': 'Beethoven, ..."
2,08536612-27c3-437e-9b44-def21034b06c-0.1,10785,Musicians from the New York Philharmonic,1842-43,"[{'eventType': 'Special', 'Location': 'Manhatt...","[{'ID': '52364*1', 'composerName': 'Beethoven,..."
3,81a3b8de-1737-4c9e-9318-b839f7c7c4c0-0.1,5887,New York Philharmonic,1842-43,"[{'eventType': 'Subscription Season', 'Locatio...","[{'ID': '52434*', 'composerName': 'Beethoven, ..."
4,09581bb7-8855-4965-b302-fc54cc669041-0.1,305,New York Philharmonic,1843-44,"[{'eventType': 'Subscription Season', 'Locatio...","[{'ID': '52453*', 'composerName': 'Beethoven, ..."


In [None]:
#Segregating nested fields

In [22]:
works_data = json_normalize(data = data['programs'],
                            record_path ='works', 
                            meta =['id', 'orchestra', 'programID', 'season'])
works_data.head()

  works_data = json_normalize(data = data['programs'],


Unnamed: 0,ID,composerName,workTitle,conductorName,soloists,movement,interval,movement._,movement.em,id,orchestra,programID,season
0,52446*,"Beethoven, Ludwig van","SYMPHONY NO. 5 IN C MINOR, OP.67","Hill, Ureli Corelli",[],,,,,00646b9f-fec7-4ffb-9fb1-faae410bd9dc-0.1,New York Philharmonic,3853,1842-43
1,8834*4,"Weber, Carl Maria Von",OBERON,"Timm, Henry C.","[{'soloistName': 'Otto, Antoinette', 'soloistI...","""Ozean, du Ungeheuer"" (Ocean, thou mighty mons...",,,,00646b9f-fec7-4ffb-9fb1-faae410bd9dc-0.1,New York Philharmonic,3853,1842-43
2,3642*,"Hummel, Johann","QUINTET, PIANO, D MINOR, OP. 74",,"[{'soloistName': 'Scharfenberg, William', 'sol...",,,,,00646b9f-fec7-4ffb-9fb1-faae410bd9dc-0.1,New York Philharmonic,3853,1842-43
3,0*,,,,[],,Intermission,,,00646b9f-fec7-4ffb-9fb1-faae410bd9dc-0.1,New York Philharmonic,3853,1842-43
4,8834*3,"Weber, Carl Maria Von",OBERON,"Etienne, Denis G.",[],Overture,,,,00646b9f-fec7-4ffb-9fb1-faae410bd9dc-0.1,New York Philharmonic,3853,1842-43


In [23]:
soloist_data = json_normalize(data = data['programs'],
                              record_path =['works', 'soloists'],
                              meta =['id'])
  
soloist_data.head()

  soloist_data = json_normalize(data = data['programs'],


Unnamed: 0,soloistName,soloistInstrument,soloistRoles,id
0,"Otto, Antoinette",Soprano,S,00646b9f-fec7-4ffb-9fb1-faae410bd9dc-0.1
1,"Scharfenberg, William",Piano,A,00646b9f-fec7-4ffb-9fb1-faae410bd9dc-0.1
2,"Hill, Ureli Corelli",Violin,A,00646b9f-fec7-4ffb-9fb1-faae410bd9dc-0.1
3,"Derwort, G. H.",Viola,A,00646b9f-fec7-4ffb-9fb1-faae410bd9dc-0.1
4,"Boucher, Alfred",Cello,A,00646b9f-fec7-4ffb-9fb1-faae410bd9dc-0.1


PROBLEM 11: Display the instrument played by Lehmann Caroline

In [24]:
soloist_df = sqlContext.createDataFrame(soloist_data)
soloist_df.show(5)

+--------------------+-----------------+------------+--------------------+
|         soloistName|soloistInstrument|soloistRoles|                  id|
+--------------------+-----------------+------------+--------------------+
|    Otto, Antoinette|          Soprano|           S|00646b9f-fec7-4ff...|
|Scharfenberg, Wil...|            Piano|           A|00646b9f-fec7-4ff...|
| Hill, Ureli Corelli|           Violin|           A|00646b9f-fec7-4ff...|
|      Derwort, G. H.|            Viola|           A|00646b9f-fec7-4ff...|
|     Boucher, Alfred|            Cello|           A|00646b9f-fec7-4ff...|
+--------------------+-----------------+------------+--------------------+
only showing top 5 rows



In [25]:
#solution code here
soloist_df.filter("soloistName == 'Lehmann, Caroline'").select("soloistName","soloistInstrument").show()

+-----------------+-----------------+
|      soloistName|soloistInstrument|
+-----------------+-----------------+
|Lehmann, Caroline|          Soprano|
|Lehmann, Caroline|          Soprano|
|Lehmann, Caroline|          Soprano|
|Lehmann, Caroline|          Soprano|
|Lehmann, Caroline|          Soprano|
|Lehmann, Caroline|          Soprano|
+-----------------+-----------------+



PROBLEM 12: Display all vocalists

In [26]:
#solution code here
soloist_df.filter("soloistInstrument=='Vocalist'").select("soloistName","soloistInstrument").show()

+--------------------+-----------------+
|         soloistName|soloistInstrument|
+--------------------+-----------------+
|       Loder, Edward|         Vocalist|
|       Loder, Edward|         Vocalist|
|       Ricci, Amalia|         Vocalist|
|          Sanquirico|         Vocalist|
|          Sanquirico|         Vocalist|
|       Ricci, Amalia|         Vocalist|
|       Loder, Edward|         Vocalist|
|       Loder, Edward|         Vocalist|
|       Loder, Edward|         Vocalist|
|             De Luce|         Vocalist|
|              Munson|         Vocalist|
|Massett, J. Schwartz|         Vocalist|
|             Arnoult|         Vocalist|
|             Arnoult|         Vocalist|
|                Mott|         Vocalist|
|                Mott|         Vocalist|
|     Northall, Julia|         Vocalist|
|     Northall, Julia|         Vocalist|
|     Northall, Julia|         Vocalist|
|                Pico|         Vocalist|
+--------------------+-----------------+
only showing top

PROBLEM 13: Display orchestra played under program id 2561

In [27]:
#solution code here
orchestra_df = works_data[works_data["programID"]=="2561"]
orchestra_df.head(10)

Unnamed: 0,ID,composerName,workTitle,conductorName,soloists,movement,interval,movement._,movement.em,id,orchestra,programID,season
363,4198*,"Gade, Niels","SYMPHONY NO. 4, B FLAT MAJOR, OP. 20","Eisfeld, Theodore",[],,,,,5d1e33be-38d3-40e5-b848-cc88dd835574-0.1,New York Philharmonic,2561,1853-54
364,4202*2,"Lortzing, Albert",ZAR UND ZIMMERMANN,"Eisfeld, Theodore","[{'soloistName': 'Schumann, Julius', 'soloistI...","""Sonst Spielt'ich Mit Zepter"" (or ""Song of the...",,,,5d1e33be-38d3-40e5-b848-cc88dd835574-0.1,New York Philharmonic,2561,1853-54
365,4206*,"Spohr, Louis","CONCERTO, VIOLIN, NO. 14, OP. 110 (CONCERTINO ...","Eisfeld, Theodore","[{'soloistName': 'Burke, Joseph', 'soloistInst...",,,,,5d1e33be-38d3-40e5-b848-cc88dd835574-0.1,New York Philharmonic,2561,1853-54
366,0*,,,,[],,Intermission,,,5d1e33be-38d3-40e5-b848-cc88dd835574-0.1,New York Philharmonic,2561,1853-54
367,4211*1,"Spohr, Louis",FAUST,"Eisfeld, Theodore",[],Overture,,,,5d1e33be-38d3-40e5-b848-cc88dd835574-0.1,New York Philharmonic,2561,1853-54
368,1587*2,"Chopin, Frédéric","CONCERTO, PIANO, NO. 1, E MINOR, OP. 11","Eisfeld, Theodore","[{'soloistName': 'Hoffman, Richard', 'soloistI...",Romance: Larghetto,,,,5d1e33be-38d3-40e5-b848-cc88dd835574-0.1,New York Philharmonic,2561,1853-54
369,1587*3,"Chopin, Frédéric","CONCERTO, PIANO, NO. 1, E MINOR, OP. 11","Eisfeld, Theodore","[{'soloistName': 'Hoffman, Richard', 'soloistI...",Rondo: Vivace,,,,5d1e33be-38d3-40e5-b848-cc88dd835574-0.1,New York Philharmonic,2561,1853-54
370,52552*48,"Mendelssohn, Felix","ST. PAUL, OP. 36","Eisfeld, Theodore","[{'soloistName': 'Schumann, Julius', 'soloistI...",Aria (Unspecified),,,,5d1e33be-38d3-40e5-b848-cc88dd835574-0.1,New York Philharmonic,2561,1853-54
371,52429*,"Beethoven, Ludwig van","SYMPHONY NO. 1 IN C MAJOR, OP.21","Eisfeld, Theodore",[],,,,,5d1e33be-38d3-40e5-b848-cc88dd835574-0.1,New York Philharmonic,2561,1853-54


Refer & Use Orchestra.xml to answer problem 14-15 below

PROBLEM 14: Display locations used for event at time 8:15 PM

In [28]:
#solution code here

PROBLEM 15: Display total number of programs

In [29]:
#solution code here

********************************************* Test ends here **************************************************