In [1]:

from os.path import abspath
from pyspark.sql import SparkSession

# warehouse_location points to the default location for managed databases and tables
warehouse_location = abspath('spark-warehouse')

# start SparkSession with Hive support enabled
spark = SparkSession \
    .builder \
    .appName("SparkSession") \
    .master("local[1]") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .enableHiveSupport() \
    .getOrCreate()

22/05/13 21:51:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:

# loading CSV data as Spark df
flights_df = spark.read.option('header', True) \
            .option('inferSchema', True) \
            .csv('data/flights_small.csv')

flights_df.printSchema()

                                                                                

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



In [3]:
flights_df.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|   12|  8|     658|       -7|     935|       -5|     VX| N846VA|  1780|   SEA| LAX|     132|     954|   6|    58|
|2014|    1| 22|    1040|        5|    1505|        5|     AS| N559AS|   851|   SEA| HNL|     360|    2677|  10|    40|
|2014|    3|  9|    1443|       -2|    1652|        2|     VX| N847VA|   755|   SEA| SFO|     111|     679|  14|    43|
|2014|    4|  9|    1705|       45|    1839|       34|     WN| N360SW|   344|   PDX| SJC|      83|     569|  17|     5|
|2014|    3|  9|     754|       -1|    1015|        1|     AS| N612AS|   522|   SEA| BUR|     127|     937|   7|    54|
+----+-----+---+--------+---------+-----

In [4]:
type(flights_df)

pyspark.sql.dataframe.DataFrame

In [5]:

# Convert from Spark df to pandas df
flights_pd = flights_df.toPandas()

In [6]:
flights_pd.head()

Unnamed: 0,year,month,day,dep_time,dep_delay,arr_time,arr_delay,carrier,tailnum,flight,origin,dest,air_time,distance,hour,minute
0,2014,12,8,658,-7,935,-5,VX,N846VA,1780,SEA,LAX,132,954,6,58
1,2014,1,22,1040,5,1505,5,AS,N559AS,851,SEA,HNL,360,2677,10,40
2,2014,3,9,1443,-2,1652,2,VX,N847VA,755,SEA,SFO,111,679,14,43
3,2014,4,9,1705,45,1839,34,WN,N360SW,344,PDX,SJC,83,569,17,5
4,2014,3,9,754,-1,1015,1,AS,N612AS,522,SEA,BUR,127,937,7,54


In [7]:
type(flights_pd)

pandas.core.frame.DataFrame

In [8]:

# Convert Pandas table back to Spark 
# Spark's createDataFrame method creates an object that is stored locally, but not in SparkSession catalog
spark_temp_df = spark.createDataFrame(flights_pd)
type(spark_temp_df)

pyspark.sql.dataframe.DataFrame

In [9]:

# Since the newly created Spark df object is stored locally, it cannot be found in other contexts such as SQL
spark.catalog.listTables()

22/05/13 21:53:30 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
22/05/13 21:53:30 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
22/05/13 21:53:32 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
22/05/13 21:53:32 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore nm@192.168.1.143
22/05/13 21:53:33 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


[Table(name='flights', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='flights_df_tbl', database='default', description=None, tableType='MANAGED', isTemporary=False)]

In [11]:

# To access the new spark df in different contexts, we need to save it as a temporary table
# using createTempView() which can duplicate tables, or createOrReplaceTempView() which avoids duplicates
# Now the table is available in this current SparkSession as a temporary table
spark_temp_df.createOrReplaceTempView("temp_tbl")
spark.catalog.listTables()

[Table(name='flights', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='flights_df_tbl', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='temp_tbl', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]