In [1]:
import pyspark

In [2]:
# SparkSession manages the connection to the SparkCluster
from pyspark.sql import SparkSession

# Config for large Spark Executors (high RAM and multiple cores)
# with room for significant Python processing taking place on the worker nodes
spark = SparkSession.builder.appName('Hello_Spark').getOrCreate()

In [3]:
spark.sparkContext.getConf().getAll()

[('spark.driver.host', 'ONS21891.Ons.Statistics.gov.uk'),
 ('spark.app.name', 'Hello_Spark'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.app.id', 'local-1525267439591'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.port', '53276')]

### LOAD 3 files.   SSN as common primary key 

In [4]:
csvfile = spark.read.csv("MOCK_DATASET1.csv", header=True, inferSchema=True)

In [5]:
csvfile.printSchema()

root
 |-- id: integer (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)
 |-- job: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- SSN: string (nullable = true)



In [7]:
csvfile.count()

1000

In [8]:
csvfile2 = spark.read.csv("MOCK_DATASET2.csv", header=True, inferSchema=True)

In [9]:
csvfile2.printSchema()

root
 |-- id: integer (nullable = true)
 |-- SSN: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- carmake: string (nullable = true)
 |-- job: string (nullable = true)



In [10]:
csvfile3 = spark.read.csv("MOCK_DATASET3.csv", header=True, inferSchema=True)

In [11]:
csvfile3.printSchema()

root
 |-- id: integer (nullable = true)
 |-- SSN: string (nullable = true)
 |-- NHS_NUMBER: long (nullable = true)
 |-- SEQ: string (nullable = true)
 |-- OUT_DATE: string (nullable = true)
 |-- IN_DATE: string (nullable = true)
 |-- ICD9: string (nullable = true)



In [12]:
csvfile=csvfile.drop('id')
csvfile.sample(fraction=0.1, withReplacement=False).\
select('job','gender','SSN').show(5)

+--------------------+------+-----------+
|                 job|gender|        SSN|
+--------------------+------+-----------+
| Clinical Specialist|Female|562-53-8334|
|Analog Circuit De...|  Male|499-57-3069|
| Structural Engineer|  Male|350-12-2717|
|Research Assistan...|  Male|366-38-4163|
|    Internal Auditor|  Male|566-55-1510|
+--------------------+------+-----------+
only showing top 5 rows



In [13]:
csvfile2=csvfile2.drop('id','job')
csvfile2.sample(fraction=0.1, withReplacement=False).show(5)

+-----------+-------+-------+
|        SSN|Country|carmake|
+-----------+-------+-------+
|350-12-2717|  China|   Ford|
|762-90-2894| Russia|Pontiac|
|566-55-1510|Nigeria| Toyota|
|407-75-9320|  China|    GMC|
|778-86-2365| Poland|   Audi|
+-----------+-------+-------+
only showing top 5 rows



In [14]:
csvfile3=csvfile3.drop('id')
csvfile3.sample(fraction=0.1, withReplacement=False).show(5)

+-----------+----------+---+----------+----------+--------------------+
|        SSN|NHS_NUMBER|SEQ|  OUT_DATE|   IN_DATE|                ICD9|
+-----------+----------+---+----------+----------+--------------------+
|557-75-7804|7790824767|FSB|11/08/2017|28/06/2015| Fx radius head-open|
|570-92-9695|7450273527|FNA|16/08/2017|10/07/2016|Sprain supraspinatus|
|547-42-3591|9982383124|ZTO|21/02/2018|14/09/2015|Mv-train coll-pedest|
|885-35-5120|7276846903|FBC|23/06/2017|17/07/2016|Fem pelvic perito...|
|756-97-6955|7765617338|WOC|05/07/2017|14/04/2016|Second malig neo ...|
+-----------+----------+---+----------+----------+--------------------+
only showing top 5 rows



In [15]:
nation_counts = csvfile2.groupBy('Country').count().show()


+-----------+-----+
|    Country|count|
+-----------+-----+
|       Chad|    1|
|     Russia|   55|
|      Yemen|    3|
|    Senegal|    2|
|     Sweden|   30|
|Philippines|   67|
|    Eritrea|    1|
|   Djibouti|    1|
|   Malaysia|    5|
|     Turkey|    1|
|     Malawi|    3|
|       Iraq|    1|
|    Germany|    2|
|Afghanistan|    1|
|Ivory Coast|    2|
|   Maldives|    1|
|     France|   29|
|     Greece|   14|
|  Sri Lanka|    1|
|       Togo|    2|
+-----------+-----+
only showing top 20 rows



its time to join these 3 dataframes using SSN as the common key

### Kinds of JOINS 
![title](joins.jpg)

In [16]:
joined= csvfile.join(csvfile2, "SSN", how = "inner")

In [17]:
joined.show(10)

+-----------+----------+---------+--------------------+------+-----------+----------+
|        SSN|first_name|last_name|                 job|gender|    Country|   carmake|
+-----------+----------+---------+--------------------+------+-----------+----------+
|661-98-8505|Ermengarde|  Muneely| Associate Professor|Female|      China|Mitsubishi|
|576-77-4107|    Tammie|   Clemow|  Analyst Programmer|  Male|       Togo|  Chrysler|
|194-33-0004|      Yvor| Ilyinykh|     Design Engineer|  Male|  Indonesia|   Pontiac|
|125-85-9610|     Allin|  Tomsett|Sales Representative|  Male|Philippines|       GMC|
|880-05-2062|    Dillon|  Middiff|       VP Accounting|  Male|   Thailand|Volkswagen|
|586-02-1203|      Troy|   Genney|     Sales Associate|  Male|      Yemen|   Mercury|
|328-44-4724|   Marlene|     Musk|     Staff Scientist|Female|   Tanzania|   Pontiac|
|562-53-8334|   Yalonda| Craister| Clinical Specialist|Female|  Indonesia|  Cadillac|
|499-57-3069|     Hewie|  Mammatt|Analog Circuit De...

In [18]:
finaljoined=joined.join(csvfile3, "SSN", how = "inner")

In [19]:
finaljoined.columns

['SSN',
 'first_name',
 'last_name',
 'job',
 'gender',
 'Country',
 'carmake',
 'NHS_NUMBER',
 'SEQ',
 'OUT_DATE',
 'IN_DATE',
 'ICD9']

In [20]:
finaljoined.describe()

DataFrame[summary: string, SSN: string, first_name: string, last_name: string, job: string, gender: string, Country: string, carmake: string, NHS_NUMBER: string, SEQ: string, OUT_DATE: string, IN_DATE: string, ICD9: string]

In [21]:
finaljoined=finaljoined.drop('job','carmake','ICD9')
finaljoined.show(10)

+-----------+----------+---------+------+-----------+----------+---+----------+----------+
|        SSN|first_name|last_name|gender|    Country|NHS_NUMBER|SEQ|  OUT_DATE|   IN_DATE|
+-----------+----------+---------+------+-----------+----------+---+----------+----------+
|661-98-8505|Ermengarde|  Muneely|Female|      China|2161139193|ZLR|18/06/2017|03/03/2016|
|576-77-4107|    Tammie|   Clemow|  Male|       Togo|6281115422|KEC|17/03/2017|25/06/2016|
|194-33-0004|      Yvor| Ilyinykh|  Male|  Indonesia| 383193184|RLF|29/06/2017|23/11/2015|
|125-85-9610|     Allin|  Tomsett|  Male|Philippines|3532442140|GEA|07/12/2018|16/10/2015|
|880-05-2062|    Dillon|  Middiff|  Male|   Thailand| 842221204|BWM|12/02/2017|16/01/2016|
|586-02-1203|      Troy|   Genney|  Male|      Yemen|9495731291|UFK|24/03/2017|11/02/2015|
|328-44-4724|   Marlene|     Musk|Female|   Tanzania|4557036961|DQK|02/05/2017|25/09/2016|
|562-53-8334|   Yalonda| Craister|Female|  Indonesia| 392632683|CRO|13/08/2017|21/05/2016|

In [28]:
from pyspark.sql.functions import levenshtein,datediff,regexp_replace,date_format

In [29]:
finaljoined.filter(finaljoined.Country.rlike('ia$')).show()

+-----------+----------+--------------+------+---------+----------+---+----------+----------+
|        SSN|first_name|     last_name|gender|  Country|NHS_NUMBER|SEQ|  OUT_DATE|   IN_DATE|
+-----------+----------+--------------+------+---------+----------+---+----------+----------+
|156-73-4507|    Hakeem|       Coronas|  Male|Indonesia| 971184089|AVD|11/07/2018|26/05/2016|
|744-09-2748| Archibald|De la Perrelle|  Male| Mongolia|2899736272|VBI|14/03/2018|01/12/2016|
|547-88-4595|      Kati|        Caunce|Female|  Bolivia| 407364048|KWK|24/07/2017|17/03/2015|
|671-13-2456|    Mollie|       Cleland|Female| Tanzania|4550866209|YFW|18/12/2018|30/01/2016|
|749-03-7928|    Sammie|       Bunnell|  Male|   Russia|1145740634|PKL|15/10/2017|16/05/2016|
|444-94-9646|    Nichol|       Carillo|Female|Indonesia|6751698364|YXF|11/09/2018|18/08/2015|
|813-10-3202|    Hersch|      Foxcroft|  Male| Colombia|6521005383|QYY|17/06/2018|02/07/2015|
|652-95-1276|  Starlene|        Surmon|Female|Indonesia| 789

In [30]:
finaljoined.withColumn('NHS_REDACTED', regexp_replace('NHS_NUMBER', '(\d)', '-')).show()

+-----------+----------+---------+------+-------------+----------+---+----------+----------+------------+
|        SSN|first_name|last_name|gender|      Country|NHS_NUMBER|SEQ|  OUT_DATE|   IN_DATE|NHS_REDACTED|
+-----------+----------+---------+------+-------------+----------+---+----------+----------+------------+
|661-98-8505|Ermengarde|  Muneely|Female|        China|2161139193|ZLR|18/06/2017|03/03/2016|  ----------|
|576-77-4107|    Tammie|   Clemow|  Male|         Togo|6281115422|KEC|17/03/2017|25/06/2016|  ----------|
|194-33-0004|      Yvor| Ilyinykh|  Male|    Indonesia| 383193184|RLF|29/06/2017|23/11/2015|   ---------|
|125-85-9610|     Allin|  Tomsett|  Male|  Philippines|3532442140|GEA|07/12/2018|16/10/2015|  ----------|
|880-05-2062|    Dillon|  Middiff|  Male|     Thailand| 842221204|BWM|12/02/2017|16/01/2016|   ---------|
|586-02-1203|      Troy|   Genney|  Male|        Yemen|9495731291|UFK|24/03/2017|11/02/2015|  ----------|
|328-44-4724|   Marlene|     Musk|Female|     

In [31]:
finaljoined = finaljoined.withColumn('OUT-DATE', regexp_replace('OUT_DATE', '/', '-'))
finaljoined = finaljoined.withColumn('IN-DATE', regexp_replace('IN_DATE', '/', '-'))

finaljoined.show()



+-----------+----------+---------+------+-------------+----------+---+----------+----------+----------+----------+
|        SSN|first_name|last_name|gender|      Country|NHS_NUMBER|SEQ|  OUT_DATE|   IN_DATE|  OUT-DATE|   IN-DATE|
+-----------+----------+---------+------+-------------+----------+---+----------+----------+----------+----------+
|661-98-8505|Ermengarde|  Muneely|Female|        China|2161139193|ZLR|18/06/2017|03/03/2016|18-06-2017|03-03-2016|
|576-77-4107|    Tammie|   Clemow|  Male|         Togo|6281115422|KEC|17/03/2017|25/06/2016|17-03-2017|25-06-2016|
|194-33-0004|      Yvor| Ilyinykh|  Male|    Indonesia| 383193184|RLF|29/06/2017|23/11/2015|29-06-2017|23-11-2015|
|125-85-9610|     Allin|  Tomsett|  Male|  Philippines|3532442140|GEA|07/12/2018|16/10/2015|07-12-2018|16-10-2015|
|880-05-2062|    Dillon|  Middiff|  Male|     Thailand| 842221204|BWM|12/02/2017|16/01/2016|12-02-2017|16-01-2016|
|586-02-1203|      Troy|   Genney|  Male|        Yemen|9495731291|UFK|24/03/2017

In [22]:
finaljoined=finaljoined.drop('IN_DATE','OUT_DATE')
finaljoined.show()

+-----------+----------+---------+------+-------------+----------+---+
|        SSN|first_name|last_name|gender|      Country|NHS_NUMBER|SEQ|
+-----------+----------+---------+------+-------------+----------+---+
|661-98-8505|Ermengarde|  Muneely|Female|        China|2161139193|ZLR|
|576-77-4107|    Tammie|   Clemow|  Male|         Togo|6281115422|KEC|
|194-33-0004|      Yvor| Ilyinykh|  Male|    Indonesia| 383193184|RLF|
|125-85-9610|     Allin|  Tomsett|  Male|  Philippines|3532442140|GEA|
|880-05-2062|    Dillon|  Middiff|  Male|     Thailand| 842221204|BWM|
|586-02-1203|      Troy|   Genney|  Male|        Yemen|9495731291|UFK|
|328-44-4724|   Marlene|     Musk|Female|     Tanzania|4557036961|DQK|
|562-53-8334|   Yalonda| Craister|Female|    Indonesia| 392632683|CRO|
|499-57-3069|     Hewie|  Mammatt|  Male|     Colombia|6574054974|SQY|
|350-12-2717|    Tulley|  Chaytor|  Male|        China|8116323385|GZK|
|628-23-6037|  Bellanca|  Walthew|Female|        China| 347316093|MSQ|
|557-7

In [52]:
crrm = finaljoined.select('first_name').\
selectExpr("first_name as name1")

crrf = finaljoined.select('first_name').\
selectExpr("first_name as name2")

crrj=crrm.crossJoin(crrf)
crrj.show()

+----------+----------+
|     name1|     name2|
+----------+----------+
|Ermengarde|Ermengarde|
|Ermengarde|    Tammie|
|Ermengarde|      Yvor|
|Ermengarde|     Allin|
|Ermengarde|    Dillon|
|Ermengarde|      Troy|
|Ermengarde|   Marlene|
|Ermengarde|   Yalonda|
|Ermengarde|     Hewie|
|Ermengarde|    Tulley|
|Ermengarde|  Bellanca|
|Ermengarde|     Kayle|
|Ermengarde| Aleksandr|
|Ermengarde|    Hughie|
|Ermengarde|     Gladi|
|Ermengarde|     Ringo|
|Ermengarde|     Alvie|
|Ermengarde|     Ginni|
|Ermengarde|  Lowrance|
|Ermengarde|       Loy|
+----------+----------+
only showing top 20 rows



In [53]:
crrj.count()

1000000