In [1]:
sc

### import

In [5]:
from pyspark.sql import SQLContext
import json
sqlCtx = SQLContext(sc)

### Dataframe making1 - with spark session

In [12]:
#SQLContext has SparkSession createDataFrame
#json file -> Rdd -> make dataframe

jsonRDD = inputJson = sc.textFile("./cars.json") \
                        .map(lambda x: json.loads(x))
cars = sqlCtx.createDataFrame(jsonRDD)

cars

DataFrame[brand: string, models: map<string,string>]

In [14]:
#schema look
cars.printSchema()

root
 |-- brand: string (nullable = true)
 |-- models: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)



In [15]:
#print table
cars.show()

+-----+--------------------+
|brand|              models|
+-----+--------------------+
| Ford|{name -> Fiesta, ...|
| Ford|{name -> Focus, p...|
| Ford|{name -> Mustang,...|
|  BMW|{name -> 320, pri...|
|  BMW|{name -> X3, pric...|
|  BMW|{name -> X5, pric...|
| Fiat|{name -> 500, pri...|
+-----+--------------------+



In [16]:
#first record
cars.first()

Row(brand='Ford', models={'name': 'Fiesta', 'price': '14260'})

### Dataframe Making 2 - with SQLContext

In [17]:
cars = sqlCtx.read.json("./cars.json")
cars.registerTempTable("cars")

cars

DataFrame[brand: string, models: struct<name:string,price:string>]

In [18]:
cars.printSchema()

root
 |-- brand: string (nullable = true)
 |-- models: struct (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- price: string (nullable = true)



In [19]:
cars.show()

+-----+----------------+
|brand|          models|
+-----+----------------+
| Ford| {Fiesta, 14260}|
| Ford|  {Focus, 18825}|
| Ford|{Mustang, 26670}|
|  BMW|    {320, 40250}|
|  BMW|     {X3, 41000}|
|  BMW|     {X5, 60700}|
| Fiat|    {500, 16495}|
+-----+----------------+



In [20]:
cars.first()

Row(brand='Ford', models=Row(name='Fiesta', price='14260'))

#changes?

## DataFrame calc

similar with SQL!

In [21]:
cars.select("brand").show()

+-----+
|brand|
+-----+
| Ford|
| Ford|
| Ford|
|  BMW|
|  BMW|
|  BMW|
| Fiat|
+-----+



In [22]:
#subtable/models
cars.select("models.price").show()

+-----+
|price|
+-----+
|14260|
|18825|
|26670|
|40250|
|41000|
|60700|
|16495|
+-----+



In [23]:
#column type change
#subtable models flatten to brand level -> price to integer
#new dataframe

from pyspark.sql.types import IntegerType

cars_flatten = cars.select("brand", "models.*");
cars_flatten = cars_flatten.withColumn("price", 
                                       cars_flatten["price"].cast(IntegerType()))

cars_flatten.show()

+-----+-------+-----+
|brand|   name|price|
+-----+-------+-----+
| Ford| Fiesta|14260|
| Ford|  Focus|18825|
| Ford|Mustang|26670|
|  BMW|    320|40250|
|  BMW|     X3|41000|
|  BMW|     X5|60700|
| Fiat|    500|16495|
+-----+-------+-----+



In [24]:
#compare

cars_flatten.filter(cars_flatten["price"] > 20000).show()

+-----+-------+-----+
|brand|   name|price|
+-----+-------+-----+
| Ford|Mustang|26670|
|  BMW|    320|40250|
|  BMW|     X3|41000|
|  BMW|     X5|60700|
+-----+-------+-----+



In [25]:
#count
cars_flatten.groupBy("brand").count().show()

+-----+-----+
|brand|count|
+-----+-----+
|  BMW|    3|
| Fiat|    1|
| Ford|    3|
+-----+-----+

