In [1]:
from pyspark import SparkContext

In [2]:
sc = SparkContext.getOrCreate()

Parallelized collections are created by calling SparkContext’s parallelize 
method on an existing iterable or collection in the driver program. The elements 
of the collection are copied to form a distributed dataset that can be operated 
on in parallel. For example, here is how to create a parallelized collection 
holding the numbers 1 to 5:

In [3]:
data = [1,2,3,4,5]
distData = sc.parallelize(data).collect()
distData

[1, 2, 3, 4, 5]

In [4]:
distData = sc.parallelize(data)
distData.reduce(lambda x, y: x + y)

15

## DataFrame Operations

In [2]:
import pyspark as spark
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName('Python Spark SQL example') \
        .config("spark.some.config.option", 'some-value') \
        .getOrCreate()

In [3]:
df = spark.read.json("people.json")
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [4]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [5]:
df.select('name').show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [6]:
df.select(df['name'], df['age'] + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [7]:
df.filter(df['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [8]:
df.groupBy('age').count().show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|null|    1|
|  30|    1|
+----+-----+



## SQL Queries

In [12]:
df.createOrReplaceTempView("people")
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [15]:
# Global Temporary View
df.createOrReplaceGlobalTempView("people")
spark.sql("SELECT * FROM global_temp.people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [16]:
spark.newSession().sql("SELECT * FROM global_temp.people").show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



## Sample Project

In [22]:
from pyspark.sql import SparkSession

spark = SparkSession \
        .builder \
        .appName('Python Spark SQL example') \
        .config("spark.some.config.option", 'some-value') \
        .getOrCreate()

sc = spark.sparkContext

In [23]:
rdd = sc.textFile('housing.data')
rdd = rdd.map(lambda line: line.split(","))

In [24]:
from pyspark.sql import Row
df = rdd.map(lambda line: Row(longitude=line[0],
                                lattitude=line[1],
                                housingMedianAge=line[2],
                                totalRooms=line[3],
                                totalBedRooms=line[4],
                                population=line[5],
                                households=line[6],
                                medianIncome=line[7],
                                medianHouseValue=line[8])).toDF()

In [25]:
df.columns

['households',
 'housingMedianAge',
 'lattitude',
 'longitude',
 'medianHouseValue',
 'medianIncome',
 'population',
 'totalBedRooms',
 'totalRooms']

In [26]:
df.count()

20640

In [27]:
df.show()

+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| households|housingMedianAge|lattitude|  longitude|medianHouseValue|medianIncome| population|totalBedRooms| totalRooms|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|
| 219.000000|       52.000000|37.850000|-122.250000|   341300.000000|    5.643100| 558.000000|   235.000000|1274.000000|
| 259.000000|       52.000000|37.850000|-122.250000|   342200.000000|    3.846200| 565.000000|   280.000000|1627.000000|
| 193.000000|       52.000000|37

In [28]:
from pyspark.sql.functions import *
#housePop = df.select(col('households')/col('population'))
df = df.withColumn('housePop', col('households')/col('population'))
df.show()

+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+-------------------+
| households|housingMedianAge|lattitude|  longitude|medianHouseValue|medianIncome| population|totalBedRooms| totalRooms|           housePop|
+-----------+----------------+---------+-----------+----------------+------------+-----------+-------------+-----------+-------------------+
| 126.000000|       41.000000|37.880000|-122.230000|   452600.000000|    8.325200| 322.000000|   129.000000| 880.000000|  0.391304347826087|
|1138.000000|       21.000000|37.860000|-122.220000|   358500.000000|    8.301400|2401.000000|  1106.000000|7099.000000| 0.4739691795085381|
| 177.000000|       52.000000|37.850000|-122.240000|   352100.000000|    7.257400| 496.000000|   190.000000|1467.000000|0.35685483870967744|
| 219.000000|       52.000000|37.850000|-122.250000|   341300.000000|    5.643100| 558.000000|   235.000000|1274.000000| 0.3924731182795699|
| 259.000000|