In [1]:
import pyspark
from pyspark.sql import SparkSession #Importing the Libraries
# Creating Spark Session
spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
# Reading /loading the Dataset from CSV file
cardf = spark.read.load("C:\\Users\\SGKsk\\Downloads\\cars.csv", format="csv", header = True,inferSchema = True)

In [3]:
#see the types of columns in DataFrame
cardf.printSchema()

root
 |-- Car: string (nullable = true)
 |-- MPG: double (nullable = true)
 |-- Cylinders: integer (nullable = true)
 |-- Displacement: double (nullable = true)
 |-- Horsepower: integer (nullable = true)
 |-- Weight: integer (nullable = true)
 |-- Acceleration: double (nullable = true)
 |-- Model: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- city: string (nullable = true)



In [3]:
#column names
cardf.columns

['Car',
 'MPG',
 'Cylinders',
 'Displacement',
 'Horsepower',
 'Weight',
 'Acceleration',
 'Model',
 'Origin',
 'quantity',
 'city']

In [4]:
#dataset
cardf.show()

+--------------------+----+---------+------------+----------+------+------------+-----+------+--------+-------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|quantity|   city|
+--------------------+----+---------+------------+----------+------+------------+-----+------+--------+-------+
|AMC Ambassador Br...|13.0|        8|       360.0|       175|  3821|        11.0|   73|    US|      25|NewYork|
|  AMC Ambassador DPL|15.0|        8|       390.0|       190|  3850|         8.5|   70|    US|       2|     NJ|
|  AMC Ambassador SST|17.0|        8|       304.0|       150|  3672|        11.5|   72|    US|       4| DALLAS|
|         AMC Concord|19.4|        6|       232.0|        90|  3210|        17.2|   78|    US|      52|  TEXAS|
|         AMC Concord|24.3|        4|       151.0|        90|  3003|        20.1|   80|    US|      42|     OH|
|     AMC Concord d/l|18.1|        6|       258.0|       120|  3410|        15.1|   78|    US|       4|N

In [5]:
#head
cardf.head()

Row(Car='AMC Ambassador Brougham', MPG=13.0, Cylinders=8, Displacement=360.0, Horsepower=175, Weight=3821, Acceleration=11.0, Model=73, Origin='US', quantity=25, city='NewYork')

In [6]:
#last 2 rows
cardf.tail(2)

[Row(Car='Volvo 264gl', MPG=17.0, Cylinders=6, Displacement=163.0, Horsepower=125, Weight=3140, Acceleration=13.6, Model=78, Origin='Europe', quantity=320, city='NewYork'),
 Row(Car='Volvo Diesel', MPG=30.7, Cylinders=6, Displacement=145.0, Horsepower=76, Weight=3160, Acceleration=19.6, Model=81, Origin='Europe', quantity=406, city='NJ')]

In [7]:
len(cardf.columns)  # show number of columns

11

In [8]:
cardf.columns     # show name of the columns

['Car',
 'MPG',
 'Cylinders',
 'Displacement',
 'Horsepower',
 'Weight',
 'Acceleration',
 'Model',
 'Origin',
 'quantity',
 'city']

In [9]:
cardf.describe('Car').show()

+-------+--------------------+
|summary|                 Car|
+-------+--------------------+
|  count|                 406|
|   mean|                NULL|
| stddev|                NULL|
|    min|AMC Ambassador Br...|
|    max|        Volvo Diesel|
+-------+--------------------+



In [10]:
cardf.orderBy(cardf.Acceleration.desc()).show(10)

+--------------------+----+---------+------------+----------+------+------------+-----+------+--------+-------+
|                 Car| MPG|Cylinders|Displacement|Horsepower|Weight|Acceleration|Model|Origin|quantity|   city|
+--------------------+----+---------+------------+----------+------+------------+-----+------+--------+-------+
|         Peugeot 504|27.2|        4|       141.0|        71|  3190|        24.8|   79|Europe|     344|NewYork|
|   Volkswagen Pickup|44.0|        4|        97.0|        52|  2130|        24.6|   82|Europe|      96|NewYork|
|Volkswagen Dasher...|43.4|        4|        90.0|        48|  2335|        23.7|   80|Europe|     371| DALLAS|
|   Volkswagen Type 3|23.0|        4|        97.0|        54|  2254|        23.5|   72|Europe|     104|NewYork|
|  Chevrolet Chevette|29.0|        4|        85.0|        52|  2035|        22.2|   76|    US|     240|  TEXAS|
|Oldsmobile Cutlas...|23.9|        8|       260.0|        90|  3420|        22.2|   79|    US|     345|N

Example 2 Creating the DataFrame from JSON File

In [4]:
#read json file into dataframe
df = spark.read.json("C:\\Users\\SGKsk\\Downloads\\zipcode.json")
df.printSchema()
df.show()


root
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Decommisioned: boolean (nullable = true)
 |-- EstimatedPopulation: long (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Location: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Long: double (nullable = true)
 |-- Notes: string (nullable = true)
 |-- RecordNumber: long (nullable = true)
 |-- State: string (nullable = true)
 |-- TaxReturnsFiled: long (nullable = true)
 |-- TotalWages: long (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Xaxis: double (nullable = true)
 |-- Yaxis: double (nullable = true)
 |-- Zaxis: double (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- Zipcode: long (nullable = true)

+-------------------+-------+-------------+-------------------+-----+--------------------+--------------------+--------------+-------+-------------+------------+-----+---------------+-------

In [5]:
#read multiline json file into dataframe

multiline_df = spark.read.option("multiline","true") \
      .json("C:\\Users\\SGKsk\\Downloads\\multiline-zipcode.json")
multiline_df.show()


+-------------------+------------+-----+-----------+-------+
|               City|RecordNumber|State|ZipCodeType|Zipcode|
+-------------------+------------+-----+-----------+-------+
|PASEO COSTA DEL SUR|           2|   PR|   STANDARD|    704|
|       BDA SAN LUIS|          10|   PR|   STANDARD|    709|
+-------------------+------------+-----+-----------+-------+



In [6]:
#read multiple JSON files into df
df2 = spark.read.json(
    ["C:\\Users\\SGKsk\\Downloads\\zipcode2.json","C:\\Users\\SGKsk\\Downloads\\zipcode1.json"])
df2.show(4)   


+-------------------+-------+-------------+-----+--------------------+--------------------+--------------+------+------------+-----+-----------+-----+-----+-----+-----------+-------+
|               City|Country|Decommisioned|  Lat|            Location|        LocationText|  LocationType|  Long|RecordNumber|State|WorldRegion|Xaxis|Yaxis|Zaxis|ZipCodeType|Zipcode|
+-------------------+-------+-------------+-----+--------------------+--------------------+--------------+------+------------+-----+-----------+-----+-----+-----+-----------+-------+
|PASEO COSTA DEL SUR|     US|        false|17.96|NA-US-PR-PASEO CO...|Paseo Costa Del S...|NOT ACCEPTABLE|-66.22|           2|   PR|         NA| 0.38|-0.87|  0.3|   STANDARD|    704|
|       BDA SAN LUIS|     US|        false|18.14|NA-US-PR-BDA SAN ...|    Bda San Luis, PR|NOT ACCEPTABLE|-66.26|          10|   PR|         NA| 0.38|-0.86| 0.31|   STANDARD|    709|
|        PARC PARQUE|     US|        false|17.96|NA-US-PR-PARC PARQUE|     Parc Parqu

In [7]:
#read all json files from directory
df3 = spark.read.json("C:/Users/SGKsk/Downloads/*.json")
df3.show()

#stopped bc took too long

In [2]:
#define custom schema for zipcode.json bc zipcode.json files have no schema info

from pyspark.sql.types import StructType,StructField, StringType, IntegerType,BooleanType,DoubleType
schema = StructType([
      StructField("RecordNumber",IntegerType(),True),
      StructField("Zipcode",IntegerType(),True),
      StructField("ZipCodeType",StringType(),True),
      StructField("City",StringType(),True),
      StructField("State",StringType(),True),
      StructField("LocationType",StringType(),True),
      StructField("Lat",DoubleType(),True),
      StructField("Long",DoubleType(),True),
      StructField("Xaxis",IntegerType(),True),
      StructField("Yaxis",DoubleType(),True),
      StructField("Zaxis",DoubleType(),True),
      StructField("WorldRegion",StringType(),True),
      StructField("Country",StringType(),True),
      StructField("LocationText",StringType(),True),
      StructField("Location",StringType(),True),
      StructField("Decommisioned",BooleanType(),True),
      StructField("TaxReturnsFiled",StringType(),True),
      StructField("EstimatedPopulation",IntegerType(),True),
      StructField("TotalWages",IntegerType(),True),
      StructField("Notes",StringType(),True)
  ])

df_with_schema = spark.read.schema(schema).json("C:/Users/SGKsk/Downloads/zipcode.json")
df_with_schema.printSchema()
df_with_schema.show(3)


root
 |-- RecordNumber: integer (nullable = true)
 |-- Zipcode: integer (nullable = true)
 |-- ZipCodeType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- Lat: double (nullable = true)
 |-- Long: double (nullable = true)
 |-- Xaxis: integer (nullable = true)
 |-- Yaxis: double (nullable = true)
 |-- Zaxis: double (nullable = true)
 |-- WorldRegion: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- LocationText: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- Decommisioned: boolean (nullable = true)
 |-- TaxReturnsFiled: string (nullable = true)
 |-- EstimatedPopulation: integer (nullable = true)
 |-- TotalWages: integer (nullable = true)
 |-- Notes: string (nullable = true)

+------------+-------+-----------+-------------------+-----+--------------+-----+------+-----+-----+-----+-----------+-------+--------------------+--------------------+-------

In [3]:
#stop Spark session when done using SparkSQL
spark.stop()