## Spark basics Review
#### Word count example

In [10]:
%%writefile /tmp/tmp.txt
this is a text file

Overwriting /tmp/tmp.txt


In [None]:
# If you need to install spark
!pip install  pyspark

In [3]:
from pyspark.sql import SparkSession

In [4]:
spark = SparkSession.builder.getOrCreate()

In [5]:
sc = spark.sparkContext

In [6]:
rdd = sc.textFile("/tmp/tmp.txt")

In [8]:
words_rdd = rdd.flatMap(lambda line: line.split()).map(lambda word: (word, 1))

In [9]:
words_rdd.countByKey()

defaultdict(int, {'this': 1, 'is': 1, 'a': 1, 'text': 1, 'file': 1})

## Spark SQL

* Download file from this [LINK](https://www.kaggle.com/promptcloud/careerbuilder-job-listing-2020/download)
* Unzip the archive and put `marketing_sample_for_careerbuilder_usa-careerbuilder_job_listing__20200401_20200630__30k_data.ldjson` file in `data/raw`

In [1]:
!ls data/raw

marketing_sample_for_careerbuilder_usa-careerbuilder_job_listing__20200401_20200630__30k_data.ldjson


In [14]:
df = spark.read.json("data/raw")

In [47]:
df.printSchema()

root
 |-- city: string (nullable = true)
 |-- company_description: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- contact_email: string (nullable = true)
 |-- country: string (nullable = true)
 |-- crawl_timestamp: string (nullable = true)
 |-- domain: string (nullable = true)
 |-- duplicate_status: string (nullable = true)
 |-- fitness_score: long (nullable = true)
 |-- geo: string (nullable = true)
 |-- has_expired: string (nullable = true)
 |-- html_job_description: string (nullable = true)
 |-- inferred_city: string (nullable = true)
 |-- inferred_country: string (nullable = true)
 |-- inferred_iso2_lang_code: string (nullable = true)
 |-- inferred_iso3_lang_code: string (nullable = true)
 |-- inferred_salary_currency: string (nullable = true)
 |-- inferred_salary_from: string (nullable = true)
 |-- inferred_salary_time_unit: string (nullable = true)
 |-- inferred_salary_to: string (nullable = true)
 |-- inferred_state: string (nullable = true)
 |-- is_co

### Compute job offers per city: SQL example

In [26]:
df.registerTempTable("jobs")

In [28]:
result_df = spark.sql("""
SELECT city, count(*) AS nb_jobs
FROM jobs
GROUP BY city
""")

In [30]:
result_df.show()

+-----------------+-------+
|             city|nb_jobs|
+-----------------+-------+
|            Tyler|     18|
|       Prattville|      2|
|          Hanover|     32|
|Kingsford Heights|      2|
|           Nahant|      8|
|        Worcester|     13|
| North Saint Paul|      1|
|           Agawam|      5|
|        Fairbanks|      8|
|            Leola|      1|
|         Bluffton|      6|
|      Middlefield|      1|
|           Aitkin|      1|
|          Jemison|      2|
|     Saint George|      1|
|           Grimes|      2|
|           Bingen|      2|
|     Harleysville|      2|
|          Minster|      1|
|      Springfield|    105|
+-----------------+-------+
only showing top 20 rows



### Compute job offers per city: Dataframe DSL example

In [34]:
result_df = df.groupBy("city").count()

In [35]:
result_df.show()

+-----------------+-----+
|             city|count|
+-----------------+-----+
|            Tyler|   18|
|       Prattville|    2|
|          Hanover|   32|
|Kingsford Heights|    2|
|           Nahant|    8|
|        Worcester|   13|
| North Saint Paul|    1|
|           Agawam|    5|
|        Fairbanks|    8|
|            Leola|    1|
|         Bluffton|    6|
|      Middlefield|    1|
|           Aitkin|    1|
|          Jemison|    2|
|     Saint George|    1|
|           Grimes|    2|
|           Bingen|    2|
|     Harleysville|    2|
|          Minster|    1|
|      Springfield|  105|
+-----------------+-----+
only showing top 20 rows



### Write Dataframe

#### CSV

In [40]:
result_df.repartition(1).write.mode("overwrite").csv("data/csv")

In [41]:
!ls data/csv/

_SUCCESS
part-00000-08295fe3-95a6-4551-8635-7d0e68c341b0-c000.csv


#### Parquet

In [43]:
result_df.write.mode("overwrite").parquet("data/parquet")

In [44]:
!ls data/parquet/

_SUCCESS
part-00000-06c4417c-50c9-4a19-ab12-1f2b903ae5cb-c000.snappy.parquet
part-00001-06c4417c-50c9-4a19-ab12-1f2b903ae5cb-c000.snappy.parquet
part-00002-06c4417c-50c9-4a19-ab12-1f2b903ae5cb-c000.snappy.parquet
part-00003-06c4417c-50c9-4a19-ab12-1f2b903ae5cb-c000.snappy.parquet
part-00004-06c4417c-50c9-4a19-ab12-1f2b903ae5cb-c000.snappy.parquet
part-00005-06c4417c-50c9-4a19-ab12-1f2b903ae5cb-c000.snappy.parquet
part-00006-06c4417c-50c9-4a19-ab12-1f2b903ae5cb-c000.snappy.parquet
part-00007-06c4417c-50c9-4a19-ab12-1f2b903ae5cb-c000.snappy.parquet
part-00008-06c4417c-50c9-4a19-ab12-1f2b903ae5cb-c000.snappy.parquet
part-00009-06c4417c-50c9-4a19-ab12-1f2b903ae5cb-c000.snappy.parquet
part-00010-06c4417c-50c9-4a19-ab12-1f2b903ae5cb-c000.snappy.parquet
part-00011-06c4417c-50c9-4a19-ab12-1f2b903ae5cb-c000.snappy.parquet
part-00012-06c4417c-50c9-4a19-ab12-1f2b903ae5cb-c000.snappy.parquet
part-00013-06c4417c-50c9-4a19-ab12-1f2b903ae5cb-c000.snappy.parquet
part-00014-06c4417c-50c9

### Filter clause

In [46]:
df.filter(df["city"] == "Tyler").count()

18

In [48]:
df.select(df["city"], df["inferred_salary_to"]).show()

+--------------------+------------------+
|                city|inferred_salary_to|
+--------------------+------------------+
|             Houston|              null|
|          Cincinnati|              null|
|             Peabody|              null|
|             Villard|             14.00|
|           Anchorage|              null|
|              Urbana|              null|
|South Brunswick T...|              null|
|              Gretna|              null|
|            Stockton|              null|
|               Tampa|              null|
|               Tulsa|              null|
|             Seattle|              null|
|          Hagerstown|         62,000.00|
|          Long Beach|              null|
|              Keyser|              null|
|       Coral Springs|              null|
|             Seattle|              null|
|               Union|              null|
|            Fountain|              null|
|              Woburn|              null|
+--------------------+------------

### Exercice 3

In [57]:
from pyspark.sql import functions as fn

In [58]:
fn.min

<function pyspark.sql.functions.min(col)>

In [49]:
jobs_per_city_df = df.groupBy("city").count()

In [54]:
recent_100_post = df.orderBy("post_date").limit(100)#.select(min()).show()

In [60]:
recent_100_post.select(fn.min(recent_100_post.post_date), fn.max(recent_100_post.post_date)).show()

+--------------+--------------+
|min(post_date)|max(post_date)|
+--------------+--------------+
|    2020-03-30|    2020-03-31|
+--------------+--------------+



In [61]:
recent_100_post.write.csv("data/most_recent")

#### Compte top 3 city having the more job posts

In [67]:
top_3_cities_with_jobs = jobs_per_city_df.orderBy("count", ascending=False).limit(3)
top_3_cities_with_jobs.show()

+---------+-----+
|     city|count|
+---------+-----+
|   Boston|  262|
|  Chicago|  218|
|Charlotte|  216|
+---------+-----+



#### Display offers in top 3 citys
##### Solution 1

In [None]:
df.registerTempTable("jobs")

In [74]:
sql_query = """
SELECT job_title, company_name, city 
FROM jobs
WHERE city = "Boston" OR city = "Chicago" OR city = "Charlotte"
"""

In [75]:
spark.sql(sql_query).show()

+--------------------+--------------------+---------+
|           job_title|        company_name|     city|
+--------------------+--------------------+---------+
|Junior Software D...|          Skillstorm|  Chicago|
|Warehouse Operato...|  Amazon Fulfillment|   Boston|
|Amazon Warehouse ...|  Amazon Fulfillment|   Boston|
|Delivery Driver- ...|Industrial Staffi...|Charlotte|
|Global Clinical L...|                GPAC|   Boston|
|Amazon Warehouse ...|  Amazon Fulfillment|   Boston|
|Telephone Line Wo...|Chicago Transit A...|  Chicago|
|Full-time/Part-ti...|  Amazon Fulfillment|   Boston|
|Project Manager (...|             Intrado|  Chicago|
|Amazon Prime Shopper|  Amazon Fulfillment|   Boston|
|Customer Service ...|   Mommy Jobs Online|Charlotte|
|Forklift Operator...|  Abacus Corporation|Charlotte|
|Registered Nurse ...|Favorite Healthca...|   Boston|
|         UX Designer|Signature Consult...|Charlotte|
|Senior Sales Manager|PointOne Recruiti...|  Chicago|
|     Warehouse Staff|      

##### Solution 2

In [77]:
sql_query = """
SELECT job_title, company_name, city 
FROM jobs
WHERE city IN ("Boston", "Chicago", "Charlotte")
"""

In [78]:
spark.sql(sql_query).show(2)

+--------------------+------------------+-------+
|           job_title|      company_name|   city|
+--------------------+------------------+-------+
|Junior Software D...|        Skillstorm|Chicago|
|Warehouse Operato...|Amazon Fulfillment| Boston|
+--------------------+------------------+-------+
only showing top 2 rows



##### Solution 3

In [80]:
top_3_cities_with_jobs.registerTempTable("top_3_cities_with_jobs")

In [82]:
sql_query = """
SELECT job_title, company_name, city 
FROM jobs
WHERE city IN (SELECT city FROM top_3_cities_with_jobs)
"""

In [83]:
spark.sql(sql_query).show(2)

+--------------------+------------------+-------+
|           job_title|      company_name|   city|
+--------------------+------------------+-------+
|Junior Software D...|        Skillstorm|Chicago|
|Warehouse Operato...|Amazon Fulfillment| Boston|
+--------------------+------------------+-------+
only showing top 2 rows



##### Solution 4

In [84]:
top_3_cities_with_jobs.registerTempTable("top_3_cities_with_jobs")

In [87]:
sql_query = """
SELECT job_title, company_name, jobs.city
FROM jobs
INNER JOIN top_3_cities_with_jobs
ON jobs.city = top_3_cities_with_jobs.city
"""

In [89]:
spark.sql(sql_query).show(2)

+--------------------+------------------+-------+
|           job_title|      company_name|   city|
+--------------------+------------------+-------+
|Junior Software D...|        Skillstorm|Chicago|
|Warehouse Operato...|Amazon Fulfillment| Boston|
+--------------------+------------------+-------+
only showing top 2 rows



In [92]:
df.join(top_3_cities_with_jobs, "city", "inner")[["job_title", "company_name", "city"]].show()

+--------------------+--------------------+---------+
|           job_title|        company_name|     city|
+--------------------+--------------------+---------+
|Junior Software D...|          Skillstorm|  Chicago|
|Warehouse Operato...|  Amazon Fulfillment|   Boston|
|Amazon Warehouse ...|  Amazon Fulfillment|   Boston|
|Delivery Driver- ...|Industrial Staffi...|Charlotte|
|Global Clinical L...|                GPAC|   Boston|
|Amazon Warehouse ...|  Amazon Fulfillment|   Boston|
|Telephone Line Wo...|Chicago Transit A...|  Chicago|
|Full-time/Part-ti...|  Amazon Fulfillment|   Boston|
|Project Manager (...|             Intrado|  Chicago|
|Amazon Prime Shopper|  Amazon Fulfillment|   Boston|
|Customer Service ...|   Mommy Jobs Online|Charlotte|
|Forklift Operator...|  Abacus Corporation|Charlotte|
|Registered Nurse ...|Favorite Healthca...|   Boston|
|         UX Designer|Signature Consult...|Charlotte|
|Senior Sales Manager|PointOne Recruiti...|  Chicago|
|     Warehouse Staff|      

### Pandas <=> Spark

In [93]:
import pandas as pd


In [95]:
pd_df = pd.DataFrame({
    "user_id": ["u0", "u1"],
    "age": [28, 50]
})
pd_df

Unnamed: 0,user_id,age
0,u0,28
1,u1,50


In [98]:
# Pandas to spark
spark_df = spark.createDataFrame(pd_df)
spark_df.show()

+-------+---+
|user_id|age|
+-------+---+
|     u0| 28|
|     u1| 50|
+-------+---+



In [99]:
# Spark to Pandas
# Require PyArrow
spark_df.toPandas()

Unnamed: 0,user_id,age
0,u0,28
1,u1,50


## UDF

In [111]:
def clean_str(s):
    return s.lower().replace(" ", "_")

In [108]:
clean_str("Junior Software Dev")

'junior_software_dev'

In [112]:
pd_df = pd.DataFrame({
    "job_title": ["Junior Software Developer", "Data Analyst"]
})
pd_df

Unnamed: 0,job_title
0,Junior Software Developer
1,Data Analyst


In [114]:
pd_df["job_title_fixed"] = pd_df["job_title"].apply(clean_str)
pd_df

Unnamed: 0,job_title,job_title_fixed
0,Junior Software Developer,junior_software_developer
1,Data Analyst,data_analyst


In [115]:
clean_str_udf = fn.udf(clean_str)

In [121]:
df.select("job_title",
          clean_str_udf(df["job_title"]).alias("job_title_fixed")
         ).show(2)



+--------------------+--------------------+
|           job_title|     job_title_fixed|
+--------------------+--------------------+
|Asphalt/Concrete ...|asphalt/concrete_...|
|Amazon Warehouse ...|amazon_warehouse_...|
+--------------------+--------------------+
only showing top 2 rows



In [123]:
spark.udf.register("clean_str", clean_str)
sql_query = """
SELECT job_title, clean_str(job_title) AS job_title_fixed
FROM jobs
LIMIT 2
"""
spark.sql(sql_query).show()

+--------------------+--------------------+
|           job_title|     job_title_fixed|
+--------------------+--------------------+
|Asphalt/Concrete ...|asphalt/concrete_...|
|Amazon Warehouse ...|amazon_warehouse_...|
+--------------------+--------------------+

