<H1>PySpark Basics

In [1]:
from pyspark import SparkContext
from pyspark.sql.session import SparkSession

In [2]:
sc = SparkContext.getOrCreate()

In [3]:
my_list = [1, 2, 3, 4]
squared_my_list = list(map(lambda x: x*x, my_list))
squared_my_list

[1, 4, 9, 16]

In [4]:
filtered_my_list = list(filter(lambda x: x%2 !=0, my_list))
filtered_my_list

[1, 3]

In [5]:
numbers = list(range(0,100))
numbersRDD = sc.parallelize(numbers)
type(numbersRDD)

pyspark.rdd.RDD

<H2>Transformations and Actions

In [6]:
numRDD = sc.parallelize(my_list)
cubeRDD = numRDD.map(lambda x: x**3)
numbers_all = cubeRDD.collect()

[print(num) for num in numbers_all];

1
8
27
64


In [7]:
pairRDD = sc.parallelize([(1, 2), (3, 4), (1, 4), (3, 2), (4, 2)])
pairRDD_Reduced = pairRDD.reduceByKey(lambda x, y: x+y)

[print(num) for num in pairRDD_Reduced.collect()];

(4, 2)
(1, 6)
(3, 6)


In [8]:
pairRDD_Reduced_sorted = pairRDD_Reduced.sortByKey(ascending=True)
[print(num) for num in pairRDD_Reduced_sorted.collect()];

(1, 6)
(3, 6)
(4, 2)


In [9]:
total = pairRDD.countByKey()
total

defaultdict(int, {1: 2, 3: 2, 4: 1})

In [10]:
type(total)

collections.defaultdict

In [11]:
total.items()

dict_items([(1, 2), (3, 2), (4, 1)])

In [12]:
[print(k, "has", v, "values") for k,v in total.items()];

1 has 2 values
3 has 2 values
4 has 1 values


<H2>Spark Session

In [13]:
spark = SparkSession(sc)

sample_list = [("Razi", 39), ("Ali", 37), ("Naqi", 33)]
sampleRDD = sc.parallelize(sample_list)
df_sample = spark.createDataFrame(sampleRDD, schema=["Name", "Age"])
type(df_sample)

pyspark.sql.dataframe.DataFrame

In [14]:
df_sample.show()

+----+---+
|Name|Age|
+----+---+
|Razi| 39|
| Ali| 37|
|Naqi| 33|
+----+---+



In [19]:
df_people = spark.read.csv("people.csv", header=True, inferSchema=True)

In [20]:
df_people.show()

+---+---------+-----------------+------+-------------+
|_c0|person_id|             name|   sex|date of birth|
+---+---------+-----------------+------+-------------+
|  0|      100|   Penelope Lewis|female|   1990-08-31|
|  1|      101|    David Anthony|  male|   1971-10-14|
|  2|      102|        Ida Shipp|female|   1962-05-24|
|  3|      103|     Joanna Moore|female|   2017-03-10|
|  4|      104|   Lisandra Ortiz|female|   2020-08-05|
|  5|      105|    David Simmons|  male|   1999-12-30|
|  6|      106|    Edward Hudson|  male|   1983-05-09|
|  7|      107|     Albert Jones|  male|   1990-09-13|
|  8|      108| Leonard Cavender|  male|   1958-08-08|
|  9|      109|   Everett Vadala|  male|   2005-05-24|
| 10|      110| Freddie Claridge|  male|   2002-05-07|
| 11|      111|Annabelle Rosseau|female|   1989-07-13|
| 12|      112|    Eulah Emanuel|female|   1976-01-19|
| 13|      113|       Shaun Love|  male|   1970-05-26|
| 14|      114|Alejandro Brennan|  male|   1980-12-22|
| 15|     

In [22]:
df_people.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- person_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- date of birth: string (nullable = true)



In [23]:
df_people.count()

100000

In [24]:
df_people.columns

['_c0', 'person_id', 'name', 'sex', 'date of birth']

In [27]:
df_people_sub = df_people.select("name", "sex", "date of birth")
df_people_sub.show(10)

+----------------+------+-------------+
|            name|   sex|date of birth|
+----------------+------+-------------+
|  Penelope Lewis|female|   1990-08-31|
|   David Anthony|  male|   1971-10-14|
|       Ida Shipp|female|   1962-05-24|
|    Joanna Moore|female|   2017-03-10|
|  Lisandra Ortiz|female|   2020-08-05|
|   David Simmons|  male|   1999-12-30|
|   Edward Hudson|  male|   1983-05-09|
|    Albert Jones|  male|   1990-09-13|
|Leonard Cavender|  male|   1958-08-08|
|  Everett Vadala|  male|   2005-05-24|
+----------------+------+-------------+
only showing top 10 rows



In [29]:
df_people_nodups = df_people.dropDuplicates()
print("Below Dropping Duplicates", df_people.count())
print("After Dropping Duplicates", df_people_nodups.count())

Below Dropping Duplicates 100000
After Dropping Duplicates 100000


In [30]:
df_people_females = df_people.filter(df_people.sex == "female")
df_people_males = df_people.filter(df_people.sex == "male")

In [31]:
df_people_females.count()

49014

In [32]:
df_people_males.count()

49066

In [34]:
df_people_sex = df_people.groupBy("sex")
df_people_sex.count().show()

+------+-----+
|   sex|count|
+------+-----+
|  null| 1920|
|female|49014|
|  male|49066|
+------+-----+



In [35]:
df_people.orderBy("date of birth").show()

+-----+---------+---------------+------+-------------+
|  _c0|person_id|           name|   sex|date of birth|
+-----+---------+---------------+------+-------------+
|57359|    57459|   Sharon Perez|female|   1899-08-28|
|62233|    62333|Martina Morison|female|   1901-04-21|
|96318|    96418|   Lisa Garrett|female|   1901-05-09|
|39703|    39803|    Naomi Davis|female|   1902-04-25|
|64563|    64663|  Brenda French|female|   1902-07-27|
|32026|    32126|   Tyler Walton|  male|   1903-07-14|
|38717|    38817|  Daniel Naiman|  male|   1903-11-07|
|  361|      461| Christy Dawson|female|   1904-01-11|
|42760|    42860|   John Merritt|  male|   1906-11-04|
|12763|    12863|   Roger Watkin|  male|   1907-12-08|
| 6477|     6577|   Marie Givens|female|   1908-02-15|
|52436|    52536|Maribel Donahoe|female|   1908-11-27|
|   85|      185|    Paula Evans|female|   1909-02-10|
|85976|    86076|     Tim Makris|  male|   1909-07-11|
|36967|    37067|   Joyce Jacoby|female|   1909-09-13|
|32878|   

In [36]:
df_people = df_people.withColumnRenamed("date of birth", "DOB")
df_people.show()

+---+---------+-----------------+------+----------+
|_c0|person_id|             name|   sex|       DOB|
+---+---------+-----------------+------+----------+
|  0|      100|   Penelope Lewis|female|1990-08-31|
|  1|      101|    David Anthony|  male|1971-10-14|
|  2|      102|        Ida Shipp|female|1962-05-24|
|  3|      103|     Joanna Moore|female|2017-03-10|
|  4|      104|   Lisandra Ortiz|female|2020-08-05|
|  5|      105|    David Simmons|  male|1999-12-30|
|  6|      106|    Edward Hudson|  male|1983-05-09|
|  7|      107|     Albert Jones|  male|1990-09-13|
|  8|      108| Leonard Cavender|  male|1958-08-08|
|  9|      109|   Everett Vadala|  male|2005-05-24|
| 10|      110| Freddie Claridge|  male|2002-05-07|
| 11|      111|Annabelle Rosseau|female|1989-07-13|
| 12|      112|    Eulah Emanuel|female|1976-01-19|
| 13|      113|       Shaun Love|  male|1970-05-26|
| 14|      114|Alejandro Brennan|  male|1980-12-22|
| 15|      115|Robert Mcreynolds|  male|1973-12-27|
| 16|      1

<H2>Using Spark SQL module

In [37]:
df_people.createOrReplaceTempView("people")
query = "select name, DOB from people where sex='male' order by DOB"
df_people_names_dob = spark.sql(query)
df_people_names_dob.show()

+-----------------+----------+
|             name|       DOB|
+-----------------+----------+
|     Tyler Walton|1903-07-14|
|    Daniel Naiman|1903-11-07|
|     John Merritt|1906-11-04|
|     Roger Watkin|1907-12-08|
|       Tim Makris|1909-07-11|
|      Jeremy Jost|1910-04-14|
|    Fredrick Nass|1911-01-12|
|       Shaun King|1911-03-27|
|  Mitchell Martin|1911-07-06|
|     Daniel Rutan|1911-08-09|
|Rigoberto Russell|1912-04-10|
|  John Vanderpool|1912-07-25|
|      Marc Maione|1912-09-09|
|   Charles Rustin|1914-01-22|
|  Tyrone Clarkson|1915-03-02|
|     Andy Cameron|1915-12-03|
|  Anthony Gardner|1916-03-12|
|   Wayne Thatcher|1916-05-24|
|    Todd Hilliard|1916-08-16|
|    Joshua Maddox|1916-10-06|
+-----------------+----------+
only showing top 20 rows



<H2> Create RDD from External File

In [38]:
clusterRDD = sc.textFile("5000_points.txt")

In [39]:
clusterRDD.getNumPartitions()

2

In [40]:
clusterRDD = sc.textFile("5000_points.txt", 5)

In [41]:
clusterRDD.getNumPartitions()

5

In [43]:
clusterRDD.take(5)

['664159\t550946',
 '665845\t557965',
 '597173\t575538',
 '618600\t551446',
 '635690\t608046']

In [44]:
rdd_split = clusterRDD.map(lambda x: x.split("\t"))

In [45]:
rdd_split.take(5)

[['664159', '550946'],
 ['665845', '557965'],
 ['597173', '575538'],
 ['618600', '551446'],
 ['635690', '608046']]

In [46]:
rdd_split_int = rdd_split.map(lambda x: [int(x[0]), int(x[1])])

In [48]:
rdd_split_int.take(5)

[[664159, 550946],
 [665845, 557965],
 [597173, 575538],
 [618600, 551446],
 [635690, 608046]]