## Step 1: Create spark context

In [1]:
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

In [2]:
spark

In [3]:
sc

## Step 2: Create a pandas dataframe

In [4]:
import pandas as pd
p_df = pd.read_csv('data/users.csv', sep='|')

In [5]:
p_df.head()

Unnamed: 0,1,24,M,technician,85711
0,2,53,F,other,94043
1,3,23,M,writer,32067
2,4,24,M,technician,43537
3,5,33,F,other,15213
4,6,42,M,executive,98101


## Step 3: Create spark dataframe

In [6]:
rdd = sc.textFile("data/users.csv").map(lambda line: line.split("|"))

In [8]:
rdd.take(5)

[['1', '24', 'M', 'technician', '85711'],
 ['2', '53', 'F', 'other', '94043'],
 ['3', '23', 'M', 'writer', '32067'],
 ['4', '24', 'M', 'technician', '43537'],
 ['5', '33', 'F', 'other', '15213']]

In [9]:
df = spark.read.csv("data/users.csv", sep="|")

In [11]:
type(rdd)

pyspark.rdd.PipelinedRDD

In [10]:
type(df)

pyspark.sql.dataframe.DataFrame

In [12]:
df.take(5)

[Row(_c0='1', _c1='24', _c2='M', _c3='technician', _c4='85711'),
 Row(_c0='2', _c1='53', _c2='F', _c3='other', _c4='94043'),
 Row(_c0='3', _c1='23', _c2='M', _c3='writer', _c4='32067'),
 Row(_c0='4', _c1='24', _c2='M', _c3='technician', _c4='43537'),
 Row(_c0='5', _c1='33', _c2='F', _c3='other', _c4='15213')]

In [13]:
# it's already a DF, but this is the easy way to rename columns
df = (spark.read.csv("data/users.csv", sep="|")
           .toDF("id", "age", "gender", "occupation", "zip"))

In [14]:
df.take(5)

[Row(id='1', age='24', gender='M', occupation='technician', zip='85711'),
 Row(id='2', age='53', gender='F', occupation='other', zip='94043'),
 Row(id='3', age='23', gender='M', occupation='writer', zip='32067'),
 Row(id='4', age='24', gender='M', occupation='technician', zip='43537'),
 Row(id='5', age='33', gender='F', occupation='other', zip='15213')]

In [15]:
df.schema

StructType(List(StructField(id,StringType,true),StructField(age,StringType,true),StructField(gender,StringType,true),StructField(occupation,StringType,true),StructField(zip,StringType,true)))

In [23]:
(
    df.where("occupation != 'other'")
      .groupby("occupation")
      .count()
      .sort("count", ascending=0)
      .show(5)
)

+-------------+-----+
|   occupation|count|
+-------------+-----+
|      student|  196|
|     educator|   95|
|administrator|   79|
|     engineer|   67|
|   programmer|   66|
+-------------+-----+
only showing top 5 rows



In [24]:
df.persist()

DataFrame[id: string, age: string, gender: string, occupation: string, zip: string]

In [28]:
# df.limit(5).toPandas()
# df.head(5)
df.show(5)

+---+---+------+----------+-----+
| id|age|gender|occupation|  zip|
+---+---+------+----------+-----+
|  1| 24|     M|technician|85711|
|  2| 53|     F|     other|94043|
|  3| 23|     M|    writer|32067|
|  4| 24|     M|technician|43537|
|  5| 33|     F|     other|15213|
+---+---+------+----------+-----+
only showing top 5 rows



In [30]:
def show(df, n=5):
    return df.limit(n).toPandas()

In [31]:
import pyspark.sql.functions as F
df.agg(F.countDistinct('occupation')).show()

+--------------------------+
|count(DISTINCT occupation)|
+--------------------------+
|                        21|
+--------------------------+



In [34]:
query = """
SELECT occupation, COUNT(*) as count
FROM users
GROUP BY occupation
ORDER BY count DESC
"""
df.createOrReplaceTempView('users')
output = spark.sql(query)
output.show(10)

+-------------+-----+
|   occupation|count|
+-------------+-----+
|      student|  196|
|        other|  105|
|     educator|   95|
|administrator|   79|
|     engineer|   67|
|   programmer|   66|
|    librarian|   51|
|       writer|   45|
|    executive|   32|
|    scientist|   31|
+-------------+-----+
only showing top 10 rows

