# Creating a SparkSession

In [2]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession

# Create my_spark
my_spark = SparkSession.builder.getOrCreate()

# Print my_spark
print(my_spark)

<pyspark.sql.session.SparkSession object at 0x000001D97F715A20>


# Viewing tables

In [10]:
# Print the tables in the catalog
print(my_spark.catalog.listTables())

[]


# Are you query-ious?

One of the advantages of the DataFrame interface is that you can run SQL queries on the tables in your Spark cluster

In [12]:
# Don't change this query
query = "FROM flights SELECT * LIMIT 10"

# Get the first 10 rows of flights
flights10 = spark.sql(query)

# Show the results
flights10.show()

# Pandafy a Spark DataFrame

In [None]:
# Don't change this query
query = "SELECT origin, dest, COUNT(*) as N FROM flights GROUP BY origin, dest"

# Run the query
flight_counts = spark.sql(query)

# Convert the results to a pandas DataFrame
pd_counts = flight_counts.toPandas()

# Print the head of pd_counts
print(pd_counts.head())

# Put some Spark in your data

In [13]:
import numpy as np
import pandas as pd

# Create pd_temp
pd_temp = pd.DataFrame(np.random.random(10))

# Create spark_temp from pd_temp
spark_temp = my_spark.createDataFrame(pd_temp)

# Examine the tables in the catalog
print(my_spark.catalog.listTables())

# Add spark_temp to the catalog
spark_temp.createOrReplaceTempView('temp')


# Examine the tables in the catalog again
print(my_spark.catalog.listTables())

[]
[Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]


# Dropping the middle man


Luckily, your SparkSession has a .read attribute which has several methods for reading different data sources into Spark DataFrames. Using these you can create a DataFrame from a .csv file just like with regular pandas DataFrames!

In [6]:
import os 

file_path = os.path.join(*["./","airport.csv"])

# Read in the airports data
airports = my_spark.read.csv(file_path, header=True)

# Show the data
airports.show()

+---+--------------------+----------------+-----------------+----+---+---+
|faa|                name|             lat|              lon| alt| tz|dst|
+---+--------------------+----------------+-----------------+----+---+---+
|04G|   Lansdowne Airport|      41.1304722|      -80.6195833|1044| -5|  A|
|06A|Moton Field Munic...|      32.4605722|      -85.6800278| 264| -5|  A|
|06C| Schaumburg Regional|      41.9893408|      -88.1012428| 801| -6|  A|
|06N|     Randall Airport|       41.431912|      -74.3915611| 523| -5|  A|
|09J|Jekyll Island Air...|      31.0744722|      -81.4277778|  11| -4|  A|
|0A9|Elizabethton Muni...|      36.3712222|      -82.1734167|1593| -4|  A|
|0G6|Williams County A...|      41.4673056|      -84.5067778| 730| -5|  A|
|0G7|Finger Lakes Regi...|      42.8835647|      -76.7812318| 492| -5|  A|
|0P2|Shoestring Aviati...|      39.7948244|      -76.6471914|1000| -5|  U|
|0S9|Jefferson County ...|      48.0538086|     -122.8106436| 108| -8|  A|
|0W3|Harford County Ai...

In [7]:
type(airports)

pyspark.sql.dataframe.DataFrame

In [8]:
my_spark.catalog.listDatabases()

[Database(name='default', description='default database', locationUri='file:/C:/Users/CAMNG3/Desktop/git_repo_public/datacamp_courses/PySpark/spark-warehouse')]

In [14]:
my_spark.catalog.listTables()

[Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

# Manipulating data

# Creating columns

In [17]:
file_path = os.path.join(*["./","flights_small.csv"])

In [18]:
flights = my_spark.read.csv(file_path, header=True)


In [19]:
#In order to use sql on the dataframe we have to insert the tables in the Spark cluster

flights.name = flights.createOrReplaceTempView('flights')

In [20]:
my_spark.catalog.listTables()


[Table(name='flights', database=None, description=None, tableType='TEMPORARY', isTemporary=True),
 Table(name='temp', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [21]:
flights.printSchema

<bound method DataFrame.printSchema of DataFrame[year: string, month: string, day: string, dep_time: string, dep_delay: string, arr_time: string, arr_delay: string, carrier: string, tailnum: string, flight: string, origin: string, dest: string, air_time: string, distance: string, hour: string, minute: string]>