In [1]:
from pyspark.sql import SQLContext
import pandas as pd

In [2]:
sqlsc = SQLContext(sc)

In [3]:
df = sqlsc.read.format("jdbc") \
  .option("url", "jdbc:postgresql://localhost/cloudera?user=cloudera") \
  .option("dbtable", "gameclicks") \
  .load()

In [4]:
df.printSchema()

root
 |-- timestamp: timestamp (nullable = false)
 |-- clickid: integer (nullable = false)
 |-- userid: integer (nullable = false)
 |-- usersessionid: integer (nullable = false)
 |-- ishit: integer (nullable = false)
 |-- teamid: integer (nullable = false)
 |-- teamlevel: integer (nullable = false)



In [5]:
df.count()

755806

In [6]:
df.head(5)

[Row(timestamp=datetime.datetime(2016, 5, 26, 15, 6, 55), clickid=105, userid=1038, usersessionid=5916, ishit=0, teamid=25, teamlevel=1),
 Row(timestamp=datetime.datetime(2016, 5, 26, 15, 7, 9), clickid=154, userid=1099, usersessionid=5898, ishit=0, teamid=44, teamlevel=1),
 Row(timestamp=datetime.datetime(2016, 5, 26, 15, 7, 14), clickid=229, userid=899, usersessionid=5757, ishit=0, teamid=71, teamlevel=1),
 Row(timestamp=datetime.datetime(2016, 5, 26, 15, 7, 14), clickid=322, userid=2197, usersessionid=5854, ishit=0, teamid=99, teamlevel=1),
 Row(timestamp=datetime.datetime(2016, 5, 26, 15, 7, 20), clickid=22, userid=1362, usersessionid=5739, ishit=0, teamid=13, teamlevel=1)]

In [7]:
df.show(5)

+--------------------+-------+------+-------------+-----+------+---------+
|           timestamp|clickid|userid|usersessionid|ishit|teamid|teamlevel|
+--------------------+-------+------+-------------+-----+------+---------+
|2016-05-26 15:06:...|    105|  1038|         5916|    0|    25|        1|
|2016-05-26 15:07:...|    154|  1099|         5898|    0|    44|        1|
|2016-05-26 15:07:...|    229|   899|         5757|    0|    71|        1|
|2016-05-26 15:07:...|    322|  2197|         5854|    0|    99|        1|
|2016-05-26 15:07:...|     22|  1362|         5739|    0|    13|        1|
+--------------------+-------+------+-------------+-----+------+---------+
only showing top 5 rows



In [8]:
df.select('userid','teamid').show(5)

+------+------+
|userid|teamid|
+------+------+
|  1038|    25|
|  1099|    44|
|   899|    71|
|  2197|    99|
|  1362|    13|
+------+------+
only showing top 5 rows



In [10]:
df.filter(df['teamlevel'] == 1).show(5)

+--------------------+-------+------+-------------+-----+------+---------+
|           timestamp|clickid|userid|usersessionid|ishit|teamid|teamlevel|
+--------------------+-------+------+-------------+-----+------+---------+
|2016-05-26 15:06:...|    105|  1038|         5916|    0|    25|        1|
|2016-05-26 15:07:...|    154|  1099|         5898|    0|    44|        1|
|2016-05-26 15:07:...|    229|   899|         5757|    0|    71|        1|
|2016-05-26 15:07:...|    322|  2197|         5854|    0|    99|        1|
|2016-05-26 15:07:...|     22|  1362|         5739|    0|    13|        1|
+--------------------+-------+------+-------------+-----+------+---------+
only showing top 5 rows



In [11]:
df.filter(df['teamlevel'] == 2).select('userid','teamid').show(10)

+------+------+
|userid|teamid|
+------+------+
|  1513|    13|
|   868|    35|
|  1453|    22|
|  1282|    93|
|  1473|    75|
|   812|    36|
|  2359|    85|
|  2048|    78|
|  1928|    82|
|  1304|    77|
+------+------+
only showing top 10 rows



In [13]:
df.groupBy('teamlevel').count().show()

+---------+------+
|teamlevel| count|
+---------+------+
|        1| 67271|
|        2| 80950|
|        3| 98823|
|        4|111176|
|        5|117099|
|        6|122757|
|        7|106436|
|        8| 51294|
+---------+------+



In [14]:
from pyspark.sql.functions import *

In [16]:
df.select(sum('ishit'),mean('ishit')).show()

+----------+------------------+
|sum(ishit)|        avg(ishit)|
+----------+------------------+
|     83383|0.1103232840173272|
+----------+------------------+



In [21]:
df2 = sqlsc.read.format("jdbc").option("url","jdbc:postgresql://localhost/cloudera?user=cloudera").option("dbtable","adclicks").load()

In [22]:
df2.printSchema()

root
 |-- timestamp: timestamp (nullable = false)
 |-- txid: integer (nullable = false)
 |-- usersessionid: integer (nullable = false)
 |-- teamid: integer (nullable = false)
 |-- userid: integer (nullable = false)
 |-- adid: integer (nullable = false)
 |-- adcategory: string (nullable = false)



In [23]:
merge = df.join(df2,'userid')

In [24]:
merge.show(5)

+------+--------------------+-------+-------------+-----+------+---------+--------------------+-----+-------------+------+----+----------+
|userid|           timestamp|clickid|usersessionid|ishit|teamid|teamlevel|           timestamp| txid|usersessionid|teamid|adid|adcategory|
+------+--------------------+-------+-------------+-----+------+---------+--------------------+-----+-------------+------+----+----------+
|   231|2016-06-08 00:45:...| 376796|        23626|    0|   142|        4|2016-06-08 01:40:...|23669|        23626|   142|  27|     games|
|   231|2016-06-08 00:45:...| 376796|        23626|    0|   142|        4|2016-06-08 09:24:...|24122|        23626|   142|   4|     games|
|   231|2016-06-08 00:45:...| 376796|        23626|    0|   142|        4|2016-06-08 17:21:...|24659|        23626|   142|  22| computers|
|   231|2016-06-08 00:45:...| 376796|        23626|    0|   142|        4|2016-06-08 23:34:...|25076|        23626|   142|  21|    movies|
|   231|2016-06-08 00:45:..

In [25]:
merge.printSchema()

root
 |-- userid: integer (nullable = false)
 |-- timestamp: timestamp (nullable = false)
 |-- clickid: integer (nullable = false)
 |-- usersessionid: integer (nullable = false)
 |-- ishit: integer (nullable = false)
 |-- teamid: integer (nullable = false)
 |-- teamlevel: integer (nullable = false)
 |-- timestamp: timestamp (nullable = false)
 |-- txid: integer (nullable = false)
 |-- usersessionid: integer (nullable = false)
 |-- teamid: integer (nullable = false)
 |-- adid: integer (nullable = false)
 |-- adcategory: string (nullable = false)

