In [1]:
# bind to spark and initialize it

import findspark
findspark.init()
import pyspark
#  introduce most of the functionality of PySpark dataframes and SQL functionality
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

# start Spark Session
conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

# check right spark
findspark.find()

'C:\\spark'

In [3]:
# full source in pys_sql.py
from IPython.core.magic import register_line_cell_magic

# Configuration parameters
max_show_lines = 50         # Limit on the number of lines to show with %sql_show and %sql_display
detailed_explain = True     # Set to False if you want to see only the physical plan when running explain


@register_line_cell_magic
def sql(line, cell=None):
    "Return a Spark DataFrame for lazy evaluation of the sql. Use: %sql or %%sql"
    val = cell if cell is not None else line 
    return spark.sql(val)

@register_line_cell_magic
def sql_show(line, cell=None):
    "Execute sql and show the first max_show_lines lines. Use: %sql_show or %%sql_show"
    val = cell if cell is not None else line 
    return spark.sql(val).show(max_show_lines) 

@register_line_cell_magic
def sql_display(line, cell=None):
    """Execute sql and convert results to Pandas DataFrame for pretty display or further processing.
    Use: %sql_display or %%sql_display"""
    val = cell if cell is not None else line 
    return spark.sql(val).limit(max_show_lines).toPandas() 

@register_line_cell_magic
def sql_explain(line, cell=None):
    "Display the execution plan of the sql. Use: %sql_explain or %%sql_explain"
    val = cell if cell is not None else line 
    return spark.sql(val).explain(detailed_explain)

## Some small Demos

In [4]:
%%sql_display
--#this is a cell in sql  
select 2+1

Unnamed: 0,(2 + 1)
0,3


In [31]:
# example spark operation
nums = sc.parallelize(range(1,4))
nums.map(lambda x: x*x).collect()


[1, 4, 9]

In [75]:
# # to stop the PySpark Session 
# sc.stop()
#

In [76]:
# sc = pyspark.SparkContext(conf=conf)
# spark = SparkSession(sc)

# File location and type
file_location = "C:\\Users\\Skyler\\Downloads\\worldnews.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
products = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(products)
products.createOrReplaceTempView("rc1")

DataFrame[_c0: string, body: string, author: string, created_utc: string, subreddit_id: string, link_id: string, parent_id: string, score: string, retrieved_on: string, controversiality: string, gilded: string, id: string, subreddit: string]

In [77]:
%%sql_display

select * from rc1 limit 5

Unnamed: 0,_c0,body,author,created_utc,subreddit_id,link_id,parent_id,score,retrieved_on,controversiality,gilded,id,subreddit
0,0,Slavs were considered Untermensch (less than h...,Say_less_fam,1560902706,t5_2qh13,t3_c2348g,t1_eriaihr,9.0,1570234835.0,0.0,0,erio2yn,worldnews
1,1,[removed],[deleted],1560125984,t5_2qh13,t3_bykehl,t1_eqk7hl9,1.0,1569308329.0,0.0,0,eqkloob,worldnews
2,2,It's been proven that there were several gassi...,,,,,,,,,,,
3,It's never been proven that Assad gassed his o...,timelow,1560644549,t5_2qh13,t3_c11ea8,t1_eragun2,3,1570058384.0,0.0,0.0,erahzo3,worldnews,
4,3,Heading towards?,,,,,,,,,,,


In [14]:
from pathlib import Path

jsonpath=

people = spark.read.json()
# people.printSchema()
people.createOrReplaceTempView("rcjson")


root
 |-- author: string (nullable = true)
 |-- author_flair_css_class: string (nullable = true)
 |-- author_flair_text: string (nullable = true)
 |-- body: string (nullable = true)
 |-- controversiality: long (nullable = true)
 |-- created_utc: long (nullable = true)
 |-- distinguished: string (nullable = true)
 |-- edited: boolean (nullable = true)
 |-- gilded: long (nullable = true)
 |-- id: string (nullable = true)
 |-- link_id: string (nullable = true)
 |-- parent_id: string (nullable = true)
 |-- retrieved_on: long (nullable = true)
 |-- score: long (nullable = true)
 |-- stickied: boolean (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- ups: long (nullable = true)



In [15]:
%%sql_display

select * from rcjson limit 5

Unnamed: 0,author,author_flair_css_class,author_flair_text,body,controversiality,created_utc,distinguished,edited,gilded,id,link_id,parent_id,retrieved_on,score,stickied,subreddit,subreddit_id,ups
0,frjo,,,A look at Vietnam and Mexico exposes the myth ...,0,1134365188,,False,0,c13,t3_17863,t3_17863,1473738411,2,False,reddit.com,t5_6,2
1,zse7zse,,,"The site states ""What can I use it for? Meetin...",0,1134365725,,False,0,c14,t3_17866,t3_17866,1473738411,1,False,reddit.com,t5_6,1
2,[deleted],,,Jython related topics by Frank Wierzbicki,0,1134366848,,False,0,c15,t3_17869,t3_17869,1473738411,0,False,reddit.com,t5_6,0
3,[deleted],,,[deleted],0,1134367660,,False,0,c16,t3_17870,t3_17870,1473738411,1,False,reddit.com,t5_6,1
4,rjoseph,,,Saft is by far the best extension you could ta...,0,1134367754,,False,0,c17,t3_17817,t3_17817,1473738411,1,False,reddit.com,t5_6,1


In [16]:
# write database to persistant form
permanent_table_name = "reddit"
products.write.format("parquet").saveAsTable(permanent_table_name)

AnalysisException: Table `reddit` already exists.;

In [85]:
# sc.stop()

Here is the first bit of heavy lifting where PySpark excels. It can directly read the bz2 files and parse the json.

In [32]:
df=spark.read.json("I:\\reddit_data\\reddit\\2006\\RC_2006-12.bz2")
df.select("author","body","created_utc","score").show(2)

+-------------+--------------------+-----------+-----+
|       author|                body|created_utc|score|
+-------------+--------------------+-----------+-----+
|procrastitron|Thanks for the li...| 1164931204|   12|
|       luke_s|Even today some o...| 1164931222|    6|
+-------------+--------------------+-----------+-----+
only showing top 2 rows

