In [2]:
import findspark
findspark.init()

In [3]:
import pyspark # only run after findspark.init()
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.getOrCreate()

In [4]:
spark

In [13]:
#from pyspark.context import SparkContext
#from pyspark.sql.context import SQLContext

In [15]:
#SparkContext
#SQLContext

In [16]:
df = spark.sql('''select 'spark' as hello ''')
df.show()

+-----+
|hello|
+-----+
|spark|
+-----+



### Create a utility function to run SQL commands

In [17]:
def run_sql(statement):
    try:
        result = spark.sql(statement)
    except Exception as e:
        print(e.desc, '\n', e.stackTrace)
        return
    return result

In [18]:
dbs = run_sql('show databases')
dbs.toPandas()

Unnamed: 0,namespace
0,country_club
1,default


In [19]:
run_sql('drop database if exists country_club cascade')
run_sql('create database country_club')
dbs = run_sql('show databases')
dbs.toPandas()

Unnamed: 0,namespace
0,country_club
1,default


In [20]:
# File location and type
file_location_bookings = "./Bookings.csv"
file_location_facilities = "./Facilities.csv"
file_location_members = "./Members.csv"

file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
bookings_df = (spark.read.format(file_type) 
                    .option("inferSchema", infer_schema) 
                    .option("header", first_row_is_header) 
                    .option("sep", delimiter) 
                    .load(file_location_bookings))

facilities_df = (spark.read.format(file_type) 
                      .option("inferSchema", infer_schema) 
                      .option("header", first_row_is_header) 
                      .option("sep", delimiter) 
                      .load(file_location_facilities))

members_df = (spark.read.format(file_type) 
                      .option("inferSchema", infer_schema) 
                      .option("header", first_row_is_header) 
                      .option("sep", delimiter) 
                      .load(file_location_members))

In [26]:
print('Bookings Schema')
bookings_df.printSchema()
print('Facilities Schema')
facilities_df.printSchema()
print('Members Schema')
members_df.printSchema()

Bookings Schema
root
 |-- bookid: integer (nullable = true)
 |-- facid: integer (nullable = true)
 |-- memid: integer (nullable = true)
 |-- starttime: string (nullable = true)
 |-- slots: integer (nullable = true)

Facilities Schema
root
 |-- facid: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- membercost: double (nullable = true)
 |-- guestcost: double (nullable = true)
 |-- initialoutlay: integer (nullable = true)
 |-- monthlymaintenance: integer (nullable = true)

Members Schema
root
 |-- memid: integer (nullable = true)
 |-- surname: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- address: string (nullable = true)
 |-- zipcode: integer (nullable = true)
 |-- telephone: string (nullable = true)
 |-- recommendedby: integer (nullable = true)
 |-- joindate: string (nullable = true)



### Create permanent tables

In [22]:
permanent_table_name_bookings = "country_club.Bookings"
bookings_df.write.format("parquet").saveAsTable(permanent_table_name_bookings)

permanent_table_name_facilities = "country_club.Facilities"
facilities_df.write.format("parquet").saveAsTable(permanent_table_name_facilities)

permanent_table_name_members = "country_club.Members"
members_df.write.format("parquet").saveAsTable(permanent_table_name_members)

### Refresh tables and check them

In [23]:
run_sql('use country_club')
run_sql('REFRESH table bookings')
run_sql('REFRESH table facilities')
run_sql('REFRESH table members')
tbls = run_sql('show tables')
tbls.toPandas()

Unnamed: 0,database,tableName,isTemporary
0,country_club,bookings,False
1,country_club,facilities,False
2,country_club,members,False


In [24]:
result = run_sql('''
                    SELECT * 
                    FROM bookings 
                    LIMIT 3
                 ''')
result.toPandas()

Unnamed: 0,bookid,facid,memid,starttime,slots
0,0,3,1,2012-07-03 11:00:00,2
1,1,4,1,2012-07-03 08:00:00,2
2,2,6,0,2012-07-03 18:00:00,2


In [25]:
result = run_sql('''
                 select name 
                 from facilities
                 where membercost > 0
                 ''')
result.toPandas()

Unnamed: 0,name
0,Tennis Court 1
1,Tennis Court 2
2,Massage Room 1
3,Massage Room 2
4,Squash Court


### Spark Context - sc

In [41]:
from pyspark import SparkConf
from pyspark.context import SparkContext

In [42]:
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
rdd = sc.textFile("C:\spark\README.md")

In [44]:
rdd.count()

109