<a href="https://colab.research.google.com/github/nwelter1/week9-data/blob/master/Operators68_PySpark_Lecture_2021.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# PySpark Introduction

In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt-get update
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
  Downloading pyspark-3.1.2.tar.gz (212.4 MB)
[K     |████████████████████████████████| 212.4 MB 64 kB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 61.8 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.2-py2.py3-none-any.whl size=212880768 sha256=b2113068d5e11e6b4a25247d8f22e71ffb5665c8dc8c4e31f708d29a85cf26fd
  Stored in directory: /root/.cache/pip/wheels/a5/0a/c1/9561f6fecb759579a7d863dcd846daaa95f598744e71b02c77
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.2
Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Get:2 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf


In [None]:
# create the session
spark = SparkSession.builder.appName('Operators68Spark').getOrCreate()

AGE_MIDPOINT = "age_midpoint"
SALARY_MIDPOINT = "salary_midpoint"
SALARY_MIDPOINT_BUCKET = "salary_midpoint_bucket"

# PySpark DataFrame Reader

In [None]:
dataFrameReader = spark.read

# Read In Stack Overflow Data

In [None]:
survey_responses = dataFrameReader.option("header", "true").option("inferSchema", value=True).csv('./stack_overflow_responses.csv')

# Display Dataset Schema

This print out shows us what is inside of the dataset and more importantly, what types of data reside in each location.

In [None]:
print("=== Print Out Schema ===")
survey_responses.printSchema()

=== Print Out Schema ===
root
 |-- _c0: integer (nullable = true)
 |-- collector: string (nullable = true)
 |-- country: string (nullable = true)
 |-- un_subregion: string (nullable = true)
 |-- so_region: string (nullable = true)
 |-- age_range: string (nullable = true)
 |-- age_midpoint: double (nullable = true)
 |-- gender: string (nullable = true)
 |-- self_identification: string (nullable = true)
 |-- occupation: string (nullable = true)
 |-- occupation_group: string (nullable = true)
 |-- experience_range: string (nullable = true)
 |-- experience_midpoint: double (nullable = true)
 |-- salary_range: string (nullable = true)
 |-- salary_midpoint: double (nullable = true)
 |-- big_mac_index: double (nullable = true)
 |-- tech_do: string (nullable = true)
 |-- tech_want: string (nullable = true)
 |-- aliens: string (nullable = true)
 |-- programming_ability: double (nullable = true)
 |-- employment_status: string (nullable = true)
 |-- industry: string (nullable = true)
 |-- company

# The Select Statement in PySpark




In [None]:
responseWithSelectedColumns = survey_responses.select("country","occupation",
                                                      AGE_MIDPOINT, SALARY_MIDPOINT)
print("=== Printing the selected cols ===")

# .show() shows full DataFrame or selection
responseWithSelectedColumns.show()

=== Printing the selected cols ===
+-----------+--------------------+------------+---------------+
|    country|          occupation|age_midpoint|salary_midpoint|
+-----------+--------------------+------------+---------------+
|Afghanistan|                null|        22.0|           null|
|Afghanistan|Mobile developer ...|        32.0|        45000.0|
|Afghanistan|                null|        null|           null|
|Afghanistan|              DevOps|        null|         5000.0|
|Afghanistan|                null|        65.0|           null|
|Afghanistan|                null|        22.0|           null|
|Afghanistan|       Growth hacker|        null|       210000.0|
|Afghanistan|Back-end web deve...|        27.0|         5000.0|
|    Albania|                null|        27.0|           null|
|    Albania|Back-end web deve...|        22.0|         5000.0|
|    Albania|Full-stack web de...|        27.0|         5000.0|
|    Albania|Full-stack web de...|        22.0|        15000.0|
|    

# Filtration in PySpark

In [None]:
print('=== Print Records where the response is from Afghanistan ===')
responseWithSelectedColumns.filter(responseWithSelectedColumns["country"] == "Afghanistan").show()

# simlar to pd syntax... which would be df[df['colname'] == condition]
query = responseWithSelectedColumns.filter(responseWithSelectedColumns["country"] == "Afghanistan")


=== Print Records where the response is from Afghanistan ===
+-----------+--------------------+------------+---------------+
|    country|          occupation|age_midpoint|salary_midpoint|
+-----------+--------------------+------------+---------------+
|Afghanistan|                null|        22.0|           null|
|Afghanistan|Mobile developer ...|        32.0|        45000.0|
|Afghanistan|                null|        null|           null|
|Afghanistan|              DevOps|        null|         5000.0|
|Afghanistan|                null|        65.0|           null|
|Afghanistan|                null|        22.0|           null|
|Afghanistan|       Growth hacker|        null|       210000.0|
|Afghanistan|Back-end web deve...|        27.0|         5000.0|
+-----------+--------------------+------------+---------------+



# Aggregations in PySpark

In [None]:
print('=== Print the count of occupations ===')
groupedData = responseWithSelectedColumns.groupby("occupation")
groupedData.count().orderBy('count', ascending=False).show()

#orderBy -- same as sorted_values in pd

=== Print the count of occupations ===
+--------------------+-----+
|          occupation|count|
+--------------------+-----+
|Full-stack web de...|  498|
|                null|  297|
|             Student|  234|
|Back-end web deve...|  221|
|Front-end web dev...|  102|
|   Desktop developer|   87|
|               other|   67|
|    Mobile developer|   60|
|Executive (VP of ...|   46|
|Enterprise level ...|   41|
|Mobile developer ...|   35|
|System administrator|   34|
|Embedded applicat...|   33|
|Mobile developer ...|   32|
|              DevOps|   29|
|Developer with a ...|   25|
|      Data scientist|   24|
| Engineering manager|   22|
|             Analyst|   20|
|     Product manager|   18|
+--------------------+-----+
only showing top 20 rows



# Finding values less than a certain value inside of a column in PySpark -- conditionals

In [None]:
print('=== Print records with average mid age less than 20 ===')
responseWithSelectedColumns.filter(responseWithSelectedColumns[AGE_MIDPOINT] < 20).show()

=== Print records with average mid age less than 20 ===
+---------+--------------------+------------+---------------+
|  country|          occupation|age_midpoint|salary_midpoint|
+---------+--------------------+------------+---------------+
|  Algeria|             Student|        16.0|           null|
|  Algeria|Back-end web deve...|        16.0|           null|
|Argentina|             Student|        16.0|         5000.0|
|Argentina|Back-end web deve...|        16.0|         5000.0|
|  Armenia|Back-end web deve...|        16.0|         5000.0|
|  Armenia|                null|        16.0|           null|
|  Armenia|Mobile developer ...|        16.0|         5000.0|
|  Armenia|Mobile developer ...|        16.0|         5000.0|
|  Austria|Mobile developer ...|        16.0|           null|
|  Austria|Full-stack web de...|        16.0|           null|
|  Austria|Full-stack web de...|        16.0|        15000.0|
|  Austria|                null|        16.0|           null|
|  Austria|   

Finding the Salary Mid-point in descending order

In [None]:
print('=== Print the result by salary mid point in descending order ===')
responseWithSelectedColumns.orderBy(responseWithSelectedColumns[SALARY_MIDPOINT], ascending=False).show()

=== Print the result by salary mid point in descending order ===
+------------------+--------------------+------------+---------------+
|           country|          occupation|age_midpoint|salary_midpoint|
+------------------+--------------------+------------+---------------+
|         Argentina|Back-end web deve...|        32.0|       210000.0|
|           Denmark|              DevOps|        44.5|       210000.0|
|         Argentina|Full-stack web de...|        27.0|       210000.0|
|           Denmark|Enterprise level ...|        32.0|       210000.0|
|Dominican Republic|Executive (VP of ...|        37.0|       210000.0|
|             China|Machine learning ...|        22.0|       210000.0|
|            France|Full-stack web de...|        32.0|       210000.0|
|           Denmark|Full-stack web de...|        22.0|       210000.0|
|       Afghanistan|       Growth hacker|        null|       210000.0|
|          Bulgaria|Enterprise level ...|        37.0|       195000.0|
|           

# Examples of Group By in PySpark

In [None]:
print('=== Group By the country and aggregate by the average salary mid point ===')
dataCountries = responseWithSelectedColumns.groupBy('country')

dataCountries.avg(SALARY_MIDPOINT).orderBy('avg(salary_midpoint)', ascending=False).show()

=== Group By the country and aggregate by the average salary mid point ===
+-----------+--------------------+
|    country|avg(salary_midpoint)|
+-----------+--------------------+
|     Belize|            125000.0|
|    Bahamas|             95000.0|
|    Denmark|   68768.65671641791|
|  Australia|    66891.8918918919|
|Afghanistan|             66250.0|
|      China|             54687.5|
|     Canada|  52586.206896551725|
|    Estonia|  51666.666666666664|
|    Germany|  46491.228070175435|
|    Belgium|   45989.01098901099|
|    Finland|   45714.28571428572|
|    Austria|   45465.11627906977|
|    Bahrain|             45000.0|
|      Chile|  41666.666666666664|
|    Andorra|             40000.0|
|    Ecuador|             40000.0|
|     France|  39648.760330578516|
|   Colombia|   33571.42857142857|
|    Algeria|             30000.0|
| Costa Rica|             30000.0|
+-----------+--------------------+
only showing top 20 rows



In [None]:
responseWithSalaryBucket = survey_responses.withColumn(SALARY_MIDPOINT_BUCKET,(survey_responses[SALARY_MIDPOINT]).cast("integer"))

print("=== With Salary Bucket Column ===")
responseWithSalaryBucket.select(SALARY_MIDPOINT, SALARY_MIDPOINT_BUCKET).show()

print('\n', '=== Group By Salary Bucket ===')

responseWithSalaryBucket.groupBy(SALARY_MIDPOINT_BUCKET).count().orderBy('count', ascending=False).show()

=== With Salary Bucket Column ===
+---------------+----------------------+
|salary_midpoint|salary_midpoint_bucket|
+---------------+----------------------+
|           null|                  null|
|        45000.0|                 45000|
|           null|                  null|
|         5000.0|                  5000|
|           null|                  null|
|           null|                  null|
|       210000.0|                210000|
|         5000.0|                  5000|
|           null|                  null|
|         5000.0|                  5000|
|         5000.0|                  5000|
|        15000.0|                 15000|
|         5000.0|                  5000|
|         5000.0|                  5000|
|        15000.0|                 15000|
|           null|                  null|
|           null|                  null|
|           null|                  null|
|           null|                  null|
|        15000.0|                 15000|
+---------------+------

# Joins In PySpark

In [None]:
makerSpace = spark.read.option('header', 'true').csv('./uk-makerspaces-indentifiable-data.csv')

postCode = spark.read.option('header', 'true').csv('./uk-postcode.csv').withColumn('PostCode', col('PostCode'))

print('=== Print first 30 records of makespace table')
makerSpace.select('Name of makerspace', 'Postcode').show(30)

print('\n','=== Print first 30 records of postcode table ===')
postCode.select('PostCode', 'Region').show(30)

=== Print first 30 records of makespace table
+--------------------+--------+
|  Name of makerspace|Postcode|
+--------------------+--------+
|        Hub Workshop|SE15 3SN|
|Nottingham Hacksp...| NG3 1JH|
|         Farset Labs|BT12 5GH|
|       Medway Makers| ME4 3JE|
|             fizzPop|  B5 5SR|
|South London Make...|SE24 9AA|
|Create Space London | HA9 6DE|
|          FounderHub|CF10 1DY|
|  LuneLab Makerspace| LA2 6ND|
|            The Shed| CT2 7NF|
|      Build Brighton| BN2 4AB|
|           Makespace| CB2 1RX|
|   Swansea Hackspace| SA1 1DP|
|57North (previous...|AB11 5BN|
|        BEC Fab Lab |CA13 0HT|
|   Dundee MakerSpace| DD1 4QB|
|                EPIK| CT3 4GP|
|Fab Lab Nerve Centre|BT48 6HJ|
|  fablab@strathclyde|  G1 1XJ|
|MakerspaceFY1 (Bl...| FY1 4DY|
|   piel view hackers|LA13 9BD|
| Leicester Hackspace| LE1 1SB|
| Potteries Hackspace| ST5 2HN|
| Newport Makers Club|NP20 1HG|
|RARA Cooperative ...|  E5 9ND|
|            Open Hub| DY1 3PD|
|Lancaster And Mor...| LA1

Now that we have the data that we want, let's join the two dataframes!

In [None]:
joined = makerSpace.join(postCode, makerSpace['Postcode'].startswith(postCode['PostCode']), 'Inner') # Inner, Outer, left_outer, right_outer

print('=== Group By Region ===')
joined.groupBy('Region').count().orderBy('count', ascending=False).show(500)

=== Group By Region ===
+--------------------+-----+
|              Region|count|
+--------------------+-----+
|             Belfast|    5|
|             Cardiff|    5|
|            Aberdeen|    4|
|       Tower Hamlets|    4|
|   Brighton and Hove|    4|
|           Southwark|    3|
|             Glasgow|    3|
|          Manchester|    3|
|             Newport|    2|
|             Lambeth|    2|
|              Oxford|    2|
|             Hackney|    2|
|             Bristol|    2|
|         Southampton|    2|
|       Milton Keynes|    2|
|           Liverpool|    2|
|            Bradford|    2|
|            Cornwall|    2|
|Richmond upon Thames|    2|
|               Leeds|    2|
|           Lancaster|    2|
|              Camden|    2|
|                York|    2|
|           Sheffield|    2|
|              Stroud|    1|
|           Guildford|    1|
|          Sunderland|    1|
|    Scottish Borders|    1|
|              Dundee|    1|
|          Wandsworth|    1|
|       Staffordshi

In [None]:
# creating a pandas DF based on PySparkDF
pandasDF = joined.toPandas()
pandasDF

Unnamed: 0,Timestamp,Collected by,Name of makerspace,Email address,Postcode,Date your space opened (or plans to open),Date your space closed (if it is no longer running),cost to member/user per month,Sculpture,Do you have a materials shop?,Do you keep a list of members or regular users?,cluster type by size,Have you adopted a code of conduct?,Gender balance of members or regular users [Female],Gender balance of members or regular users [Male],Gender balance of members or regular users [Other],Gender balance of members or regular users [Prefer not to say],What ethnic groups are represented amongst your membership? [White],What ethnic groups are represented amongst your membership? [Mixed or multiple ethnic groups],What ethnic groups are represented amongst your membership? [Asian or Asian British],What ethnic groups are represented amongst your membership? [Other ethnic group],Total visits in November 2014,Unique users in November 2014,User types in November 2014 [Visitors or observers],User types in November 2014 [Hobbyist],User types in November 2014 [Startups],User types in November 2014 [Sole traders/Microbusinesses],User types in November 2014 [SMEs],User types in November 2014 [Students],User types in November 2014 [Teachers],User types in November 2014 [Other / don't know],Purpose of user visits in November 2014 [Introduction to making],Purpose of user visits in November 2014 [Make something specific],Purpose of user visits in November 2014 [Prototype],Purpose of user visits in November 2014 [Make one-off pieces],Purpose of user visits in November 2014 [Small batch production],Purpose of user visits in November 2014 [Socialise],Purpose of user visits in November 2014 [Other (detail below)],Do you produce accounts?,PostCode,Latitude,Longitude,Easting,Northing,GridRef,Town/Area,Region,Postcodes,Active postcodes,Population,Households
0,15-Jan-15,Makerspace. Edited by Researcher,57North (previously Hackerdeen),contact@57north.co,AB11 5BN,5/10/13,,20,,No,Yes,small,No,<10%,80-89%,10-19%,<10%,90-100%,,,,50-100,10 to 50,< 10%,80-89%,,,,< 10%,,< 10%,<10%,<10%,90-100%,,,90-100%,,Yes,AB1,57.1269,-2.13644,391839,804005,NJ918040,Aberdeen,Aberdeen,2655,0,,
1,8-Mar-15,Makerspace. Edited by Researcher,Make Aberdeen,info@make-aberdeen.com,AB10 1JR,,,,,,,unknown,,,,,,,,,,,,,,,,,,,,,,,,,,,,AB1,57.1269,-2.13644,391839,804005,NJ918040,Aberdeen,Aberdeen,2655,0,,
2,8-Mar-15,Makerspace. Edited by Researcher,Make Aberdeen,info@make-aberdeen.com,AB10 1JR,,,,,,,unknown,,,,,,,,,,,,,,,,,,,,,,,,,,,,AB10,57.1348,-2.11748,392988,804882,NJ929048,"Aberdeen city centre, Bridge of Dee, Mannofield",Aberdeen,888,675,21964,11517
3,15-Jan-15,Makerspace. Edited by Researcher,57North (previously Hackerdeen),contact@57north.co,AB11 5BN,5/10/13,,20,,No,Yes,small,No,<10%,80-89%,10-19%,<10%,90-100%,,,,50-100,10 to 50,< 10%,80-89%,,,,< 10%,,< 10%,<10%,<10%,90-100%,,,90-100%,,Yes,AB11,57.1371,-2.09341,394445,805136,NJ944051,"Aberdeen city centre, Torry",Aberdeen,889,644,21237,10926
4,8-Jan-15,Makerspace,fizzPop,fizzpop.makers@gmail .com,B5 5SR,18/07/2013,,Oct-40,,No,Yes,medium,No,10-19%,70-79%,,,70-79%,,,,,,,,,,,,,Don't know,,,,,,,Don't know,Yes,B5,52.4722,-1.89687,407102,286034,SP071860,"Digbeth, Highgate, Lee Bank",Birmingham,869,392,12156,5139
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
115,4-Feb-15,Makerspace. Edited by Researcher,MakeSpace at the Institute of Making,hello@instituteofmaking.org.uk,WC1E 7JE,16/12/2012,,0,,No,Yes,large,Yes,40-49%,50-59%,,,,,,,5000+,,30-39%,< 10%,< 10%,,,30-39%,20-29%,,40-49%,,30-39%,20-29%,,10-19%,,Yes,WC1E,51.5207,-0.132408,529671,181851,TQ296818,University College London,Camden,291,111,2256,770
116,25-Feb-15,Makerspace. Edited by Researcher,Makerversity,christina@makerversity.org,WC2R 1LA,11/9/13,,,,No,Yes,large,No,,,,,,,,,100-250,10 to 50,,,,,,,,,,,,,,,,Yes,WC2R,51.5122,-0.118399,530667,180931,TQ306809,Somerset House,Westminster,404,118,138,53
117,21-Jan-15,Makerspace. Edited by Researcher,Leigh Hackspace,info@leighhack.org,WN7 1DR,31/01/2015,,,,No,Yes,small,No,<10%,90-100%,,,90-100%,<10%,<10%,<10%,0-10,10 to 50,10-19%,< 10%,< 10%,< 10%,< 10%,< 10%,10-19%,< 10%,10-19%,50-59%,10-19%,50-59%,<10%,80-89%,,Yes,WN7,53.4973,-2.51804,365732,400191,SD657001,"Hope Carr, Landside, Leigh, Low Common, Bedfor...",Wigan,1423,1134,47823,21097
118,14-Feb-15,Makerspace,York Hackspace,makers@york.hackspace.org.uk,YO23 1AB,16/09/2011,,,,No,No,small,No,10-19%,80-89%,,,90-100%,,,,0-10,0-10,< 10%,90-100%,< 10%,,,< 10%,,,<10%,,,<10%,,90-100%,,No,YO2,53.9484,-1.12164,457746,450612,SE577506,York,York,1699,0,,
