In [1]:
from pyspark import SparkContext, SparkConf, SQLContext
conf = SparkConf().setAppName('test').set("spark.executor.memory", "2g").setMaster("local[*]")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)

In [2]:
# Create a DataFrame from a CSV file
csv_filepath = '/jupyter-vagrant/notebook/On_Time_Performance_2013_7.csv'
dataframe = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load(csv_filepath)

In [3]:
dataframe.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Quarter: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- DayofMonth: integer (nullable = true)
 |-- DayOfWeek: integer (nullable = true)
 |-- FlightDate: timestamp (nullable = true)
 |-- UniqueCarrier: string (nullable = true)
 |-- AirlineID: integer (nullable = true)
 |-- Carrier: string (nullable = true)
 |-- TailNum: string (nullable = true)
 |-- FlightNum: integer (nullable = true)
 |-- OriginAirportID: integer (nullable = true)
 |-- OriginAirportSeqID: integer (nullable = true)
 |-- OriginCityMarketID: integer (nullable = true)
 |-- Origin: string (nullable = true)
 |-- OriginCityName: string (nullable = true)
 |-- OriginState: string (nullable = true)
 |-- OriginStateFips: integer (nullable = true)
 |-- OriginStateName: string (nullable = true)
 |-- OriginWac: integer (nullable = true)
 |-- DestAirportID: integer (nullable = true)
 |-- DestAirportSeqID: integer (nullable = true)
 |-- DestCityMarketID: integer (nu

In [4]:
# Register the DataFrame as a SQL temporary view
dataframe.createOrReplaceTempView("table")
sql_df = sqlContext.sql("SELECT distinct Year, Carrier FROM table")
sql_df.show()

+----+-------+
|Year|Carrier|
+----+-------+
|2013|     UA|
|2013|     WN|
|2013|     B6|
|2013|     AS|
|2013|     HA|
|2013|     DL|
|2013|     OO|
|2013|     MQ|
|2013|     YV|
|2013|     F9|
|2013|     AA|
|2013|     9E|
|2013|     FL|
|2013|     VX|
|2013|     US|
|2013|     EV|
+----+-------+



In [5]:
# Data source types and save modes
dataframe.write.save("temp.parquet", format="parquet", mode='overwrite')
dataframe.write.save("temp.json", format="json", mode='overwrite')
#dataframe.write.save("temp.jdbc", format="jdbc", mode='overwrite')  # requires URL
dataframe.write.save("temp.orc", format="orc", mode='overwrite')
#dataframe.write.save("temp", format="libsvm", mode='overwrite')  # Fails
dataframe.write.save("temp.csv", format="csv", mode='overwrite')
#dataframe.write.save("temp.text", format="text", mode='overwrite')  # Text data source supports only a single column

# Alternative syntax
#dataframe.write.format('com.databricks.spark.csv').mode('overwrite').save("temp.json")

# Save modes: "error", "append", "overwrite", "ignore"

####  ExampleSizes
|Size|Format|
|------|--------|
|247M|Source CSV|
|200M|temp.csv/|
|616M|temp.json/|
|26M|temp.orc/|
|17M|temp.parquet/|


In [6]:
# Alternatively read other data sources
dataframe = sqlContext.read.parquet("temp.parquet")
dataframe = sqlContext.read.json("temp.json")
dataframe = sqlContext.read.orc("temp.orc")