In [1]:
!pip install pyspark
!pip install findspark

Collecting pyspark
  Using cached pyspark-3.5.0-py2.py3-none-any.whl
Collecting py4j==0.10.9.7 (from pyspark)
  Using cached py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.5.0
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [2]:
import findspark
findspark.init()

In [3]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [None]:
# Exercise 1

In [4]:
# Creating a spark context class
sc = SparkContext()

# Creating a spark session
spark = SparkSession.builder.appName("Python Spark DataFrames basic example").config("spark.some.config.option", "some-value").getOrCreate()

In [5]:
spark

In [8]:
data = range(1,30)

print(data[0])
print(len(data))
xrangeRDD = sc.parallelize(data, 4)

xrangeRDD

1
29


PythonRDD[3] at RDD at PythonRDD.scala:53

In [9]:
subRDD = xrangeRDD.map(lambda x: x-1)
filteredRDD = subRDD.filter(lambda x: x<10)

Task 3: Actions

A transformation returns a result to the driver. We now apply the collect() action to get the output from the transformation.

In [10]:
print(filteredRDD.collect())
filteredRDD.count()

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]


10

In [11]:
import time 

test = sc.parallelize(range(1,50000),4)
test.cache()

t1 = time.time()

count1 = test.count()
dt1 = time.time() - t1
print("dt1: ", dt1)

t2 = time.time()
count2 = test.count()
dt2 = time.time() - t2
print("dt2: ", dt2)

dt1:  9.728809356689453
dt2:  4.8059797286987305


In [12]:
test.count()

49999

Exercise 3: DataFrames and SparkSQL

In order to work with the extremely powerful SQL engine in Apache Spark, you will need a Spark Session. We have created that in the first Exercise, let us verify that spark session is still active.

In [13]:
spark

In [14]:
!curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/data/people.json >> people.json

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:--  0:00:01 --:--:--     0
100    73  100    73    0     0     52      0  0:00:01  0:00:01 --:--:--    52


In [15]:
df = spark.read.json("people.json").cache()

In [16]:
df.show()
df.printSchema()

+----+-------+
| age|   name|
+----+-------+
|NULL|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [20]:
a = df.createTempView("peoples")

In [23]:
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [24]:
df.select(df["name"]).show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [25]:
spark.sql("SELECT name FROM people").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [26]:
df.filter(df["age"] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [27]:
spark.sql("SELECT age, name FROM people WHERE age > 21").show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [28]:
df.groupBy("age").count().show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|NULL|    1|
|  30|    1|
+----+-----+



In [31]:
spark.sql("SELECT age FROM people GROUP BY age").show()

+----+
| age|
+----+
|  19|
|  30|
|NULL|
+----+



## Exercise

### Exercise 1

In [32]:
numbers = range(1,50)

numbers_RDD = sc.parallelize(numbers, 4)

even_numbers_RDD = numbers_RDD.map(lambda x: x*2)

even_numbers_RDD = even_numbers_RDD.filter(lambda x: x % 2 == 0)

In [34]:
print(even_numbers_RDD.collect())

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98]


In [36]:
numbers = range(1, 100)
numbers_RDD = sc.parallelize(numbers, 4)
numbers_by_two = numbers_RDD.map(lambda x:x*2)
even_num = numbers_by_two.filter(lambda x: x%2==0)
print(even_num.collect())

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56, 58, 60, 62, 64, 66, 68, 70, 72, 74, 76, 78, 80, 82, 84, 86, 88, 90, 92, 94, 96, 98, 100, 102, 104, 106, 108, 110, 112, 114, 116, 118, 120, 122, 124, 126, 128, 130, 132, 134, 136, 138, 140, 142, 144, 146, 148, 150, 152, 154, 156, 158, 160, 162, 164, 166, 168, 170, 172, 174, 176, 178, 180, 182, 184, 186, 188, 190, 192, 194, 196, 198]


### Excercise 2

In [37]:
!curl https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/labs/people2.json >> people2.json

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed

  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0
100   136  100   136    0     0    108      0  0:00:01  0:00:01 --:--:--   108
100   136  100   136    0     0    108      0  0:00:01  0:00:01 --:--:--   108


In [39]:
df = spark.read.json("people2.json").cache()
df.createTempView("people2")

In [41]:
spark.sql("SELECT * FROM people2").show()

+---+-------+
|age|   name|
+---+-------+
| 25|Michael|
| 24|   Andy|
| 19| Justin|
| 26| George|
| 30|   Jeff|
+---+-------+



In [47]:
df.select(["name","age"]).show()

+-------+---+
|   name|age|
+-------+---+
|Michael| 25|
|   Andy| 24|
| Justin| 19|
| George| 26|
|   Jeff| 30|
+-------+---+



In [50]:
res = spark.sql("SELECT AVG(age) FROM people2").show()
res

+--------+
|avg(age)|
+--------+
|    24.8|
+--------+



In [51]:
spark.stop()

In [52]:
res