Creating Dataframes from RDD programmatically

Import findspark and initiate.
Then import pyspark

In [23]:
import findspark
findspark.init()
import pyspark

Start SparkSession

In [24]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark SQL example").getOrCreate()

In [25]:
sc = spark.sparkContext

Create an RDD from the structured text file

In [26]:
clines = sc.textFile("customers.tsv")

Import types from sql to be able to create StructTypes

In [27]:
from pyspark.sql.types import *

In [28]:
cfields = clines.map(lambda l: l.split("\t"))
customers = cfields.map(lambda p: (p[0], p[1], p[2], p[3], p[4]))

The schema encoded in a string.

In [29]:
schemaString = "cid cname ccity cstate czip"

In [30]:
ccolumns = [StructField(column_name, StringType(), True) for column_name in schemaString.split()]
schema = StructType(ccolumns)

In [31]:
type(ccolumns)

list

Apply the schema to the RDD to create the dataframe

In [32]:
customerDF = spark.createDataFrame(customers, schema)

In [33]:
customerDF.printSchema()

root
 |-- cid: string (nullable = true)
 |-- cname: string (nullable = true)
 |-- ccity: string (nullable = true)
 |-- cstate: string (nullable = true)
 |-- czip: string (nullable = true)



In [34]:
customerDF.select("cname").show()

+----------------+
|           cname|
+----------------+
|     Mary Torres|
|      Jose Haley|
|      Mary Smith|
|  Richard Maddox|
|  Margaret Booth|
|  Mary Henderson|
|     Lisa Walker|
|   Jonathan Hill|
|Carolyn Sheppard|
|    Mary Mendoza|
|   Michael Smith|
|    James Holmes|
|     Mary Dawson|
|    Adam Marquez|
|    Gloria Smith|
|       Mary Webb|
|  Nancy Alvarado|
|  Russell Flores|
|    Denise Smith|
|  Jose Dickerson|
+----------------+
only showing top 20 rows



In [35]:
customerDF.select(customerDF['cname'], customerDF['ccity']).show()

+----------------+-------------+
|           cname|        ccity|
+----------------+-------------+
|     Mary Torres|       Caguas|
|      Jose Haley|     Columbus|
|      Mary Smith|      Houston|
|  Richard Maddox|       Caguas|
|  Margaret Booth|    Arlington|
|  Mary Henderson|       Caguas|
|     Lisa Walker|       Caguas|
|   Jonathan Hill|      Phoenix|
|Carolyn Sheppard|Pompano Beach|
|    Mary Mendoza|       Caguas|
|   Michael Smith|       Caguas|
|    James Holmes|     Hilliard|
|     Mary Dawson|       Caguas|
|    Adam Marquez|  San Antonio|
|    Gloria Smith|       Caguas|
|       Mary Webb|   San Marcos|
|  Nancy Alvarado|     Flushing|
|  Russell Flores|       Caguas|
|    Denise Smith|    Rego Park|
|  Jose Dickerson|         Mesa|
+----------------+-------------+
only showing top 20 rows



In [36]:
customerDF.filter(customerDF['cstate'] == 'CA').show()

+-----+----------------+---------------+------+-----+
|  cid|           cname|          ccity|cstate| czip|
+-----+----------------+---------------+------+-----+
| 5577|      Mary Smith|        Modesto|    CA|95350|
| 1745|      Mary Smith|Rowland Heights|    CA|91748|
|11444|Kathleen Patrick|      San Diego|    CA|92109|
| 8846|    Thomas Smith|          Indio|    CA|92201|
| 6237|  Bobby Anderson|       El Cajon|    CA|92020|
| 4085|       Mary Carr|  Panorama City|    CA|91402|
| 8705|  Patricia Smith|       Stockton|    CA|95207|
| 3669|       Mary Soto| San Bernardino|    CA|92410|
| 6101|      Mary Smith|    Los Angeles|    CA|90033|
|11697|  Jessica Thomas|  Laguna Niguel|    CA|92677|
| 1295|   Theresa Lopez|       Winnetka|    CA|91306|
| 4814|     Paul Suarez|    Simi Valley|    CA|93065|
| 8530|   William Smith|       Highland|    CA|92346|
| 3846|    Ronald Lewis|        Ontario|    CA|91764|
|10476|     John Hodges|       Cerritos|    CA|90703|
|10243|  Donna Anderson|    

In [37]:
customerDF.groupBy("cstate").count().show()

+------+-----+
|cstate|count|
+------+-----+
|    AZ|   19|
|    SC|    2|
|    LA|    7|
|    MN|    1|
|    NJ|   19|
|    DC|    4|
|    OR|    4|
|    VA|   14|
|    RI|    2|
|    KY|    1|
|    MI|   28|
|    NV|   16|
|    WI|    9|
|    ID|    2|
|    CA|  187|
|    CT|    8|
|    NC|   19|
|    MD|   19|
|    DE|    1|
|    MO|   13|
+------+-----+
only showing top 20 rows



Create a temp view so that SQL queries can be run

In [38]:
customerDF.createOrReplaceTempView("customers")

In [None]:
cStateCount50 = spark.sql("SELECT cstate, count(*) as sttcount FROM customers GROUP BY cstate HAVING sttcount>=50")

In [None]:
cStateCount50.show()

In [None]:
cStateCount50.printSchema()

In [None]:
type(cStateCount50)