In [1]:
##Connect to Postgres Table. 
## the first line imports the SQLContext module, which is needed access SQL databases in Spark:

In [3]:
from pyspark.sql import SQLContext

In [3]:
## The second line creates a new SQLContext from the SparkContext sc:

In [4]:
sqlsc = SQLContext(sc)
type(sqlsc)

pyspark.sql.context.SQLContext

In [5]:
## below creates a new Spark DataFrame in the variable df for the Postgres table gameclicks:

# The format("jdbc") says that the source of the DataFrame will be using a Java database connection, the url option is the 
# URL connection string to access the Postgres database, and the dbtable option specifies the gameclicks table.


In [4]:
df = sqlsc.read.format("jdbc") \
  .option("url", "jdbc:postgresql://localhost/cloudera?user=cloudera") \
  .option("dbtable", "gameclicks") \
  .load()

In [5]:
## Step 3. View Spark DataFrame schema and count rows. We can call the printSchema()
## method to view the schema of the DataFrame:

In [6]:
df.printSchema()

root
 |-- timestamp: timestamp (nullable = false)
 |-- clickid: integer (nullable = false)
 |-- userid: integer (nullable = false)
 |-- usersessionid: integer (nullable = false)
 |-- ishit: integer (nullable = false)
 |-- teamid: integer (nullable = false)
 |-- teamlevel: integer (nullable = false)



In [9]:
## we call the count() method to count the number of rows in the DataFrameb

In [7]:
df.count()

755806

In [None]:
#Step 4. View contents of DataFrame.  We can call the show() method

In [8]:
df.show(5)

+--------------------+-------+------+-------------+-----+------+---------+
|           timestamp|clickid|userid|usersessionid|ishit|teamid|teamlevel|
+--------------------+-------+------+-------------+-----+------+---------+
|2016-05-26 15:06:...|    105|  1038|         5916|    0|    25|        1|
|2016-05-26 15:07:...|    154|  1099|         5898|    0|    44|        1|
|2016-05-26 15:07:...|    229|   899|         5757|    0|    71|        1|
|2016-05-26 15:07:...|    322|  2197|         5854|    0|    99|        1|
|2016-05-26 15:07:...|     22|  1362|         5739|    0|    13|        1|
+--------------------+-------+------+-------------+-----+------+---------+
only showing top 5 rows



In [14]:
#Step 5. Filter columns in DataFrame. We can call select for one or more columns by calling the select() method:

In [9]:
df.select("clickid","userid").show(5)

+-------+------+
|clickid|userid|
+-------+------+
|    105|  1038|
|    154|  1099|
|    229|   899|
|    322|  2197|
|     22|  1362|
+-------+------+
only showing top 5 rows



In [12]:
## Step 6. Filter rows based on criteria. 
df.filter(df["teamlevel"]>3).select("userid","teamid","teamlevel").show(5)

+------+------+---------+
|userid|teamid|teamlevel|
+------+------+---------+
|  1183|    32|        4|
|  1687|    53|        4|
|  2009|    64|        4|
|  1307|    72|        4|
|  1457|    77|        4|
+------+------+---------+
only showing top 5 rows



In [None]:
## Step 7. Group by a column and count. 

In [11]:
df.groupBy("teamlevel").count().show()

+---------+------+
|teamlevel| count|
+---------+------+
|        1| 67271|
|        2| 80950|
|        3| 98823|
|        4|111176|
|        5|117099|
|        6|122757|
|        7|106436|
|        8| 51294|
+---------+------+



In [13]:
df.groupBy("ishit").count().show()

+-----+------+
|ishit| count|
+-----+------+
|    0|672423|
|    1| 83383|
+-----+------+



In [None]:
## Step 8. Calculate average and sum. 
#  First, let's import the Python libraries for the aggregate operations. Next,
#  we can calculate the average and total values by calling the mean() and sum() methods, respectively:

In [14]:
from pyspark.sql.functions import *
df.select(mean('ishit'),sum('ishit')).show()

+------------------+----------+
|        avg(ishit)|sum(ishit)|
+------------------+----------+
|0.1103232840173272|     83383|
+------------------+----------+



In [None]:
# Step 9. Join two DataFrames. We can merge or join two Dataframes on a single column.


In [15]:
df2 = sqlsc.read.format("jdbc") \
.option("url", "jdbc:postgresql://localhost/cloudera?user=cloudera") \
.option("dbtable","adclicks") \
.load()

In [27]:
df2.printSchema()

root
 |-- timestamp: timestamp (nullable = false)
 |-- txid: integer (nullable = false)
 |-- usersessionid: integer (nullable = false)
 |-- teamid: integer (nullable = false)
 |-- userid: integer (nullable = false)
 |-- adid: integer (nullable = false)
 |-- adcategory: string (nullable = false)



In [26]:
df2.show(5)

+--------------------+----+-------------+------+------+----+-----------+
|           timestamp|txid|usersessionid|teamid|userid|adid| adcategory|
+--------------------+----+-------------+------+------+----+-----------+
|2016-05-26 15:13:...|5974|         5809|    27|   611|   2|electronics|
|2016-05-26 15:17:...|5976|         5705|    18|  1874|  21|     movies|
|2016-05-26 15:22:...|5978|         5791|    53|  2139|  25|  computers|
|2016-05-26 15:22:...|5973|         5756|    63|   212|  10|    fashion|
|2016-05-26 15:22:...|5980|         5920|     9|  1027|  20|   clothing|
+--------------------+----+-------------+------+------+----+-----------+
only showing top 5 rows



In [None]:
## We can see that the adclicks df2 DataFrame also has a column called userid. Next, we will
# combine the gameclicks and adclicks DataFrames by calling the join() method and saving
# the resulting DataFrame in a variable called merge:


In [28]:
merge = df.join(df2,'userid')

In [29]:
merge.printSchema()

root
 |-- userid: integer (nullable = false)
 |-- timestamp: timestamp (nullable = false)
 |-- clickid: integer (nullable = false)
 |-- usersessionid: integer (nullable = false)
 |-- ishit: integer (nullable = false)
 |-- teamid: integer (nullable = false)
 |-- teamlevel: integer (nullable = false)
 |-- timestamp: timestamp (nullable = false)
 |-- txid: integer (nullable = false)
 |-- usersessionid: integer (nullable = false)
 |-- teamid: integer (nullable = false)
 |-- adid: integer (nullable = false)
 |-- adcategory: string (nullable = false)

