In [1]:
import os
from pyspark import SparkContext
from pyspark.sql import SQLContext

# print SPARK HOME
spark_home = os.environ.get('SPARK_HOME', None)
print (spark_home)
sqlContext = SQLContext(sc)

/home/yolanda/spark


In [5]:
rdd_txt = sc.textFile("/home/yolanda/spark/AOL-user-ct-collection/user-ct-test-collection-*.txt")



In [6]:
#Parallel (2 m)
rdd_txt.count()

36389577

In [7]:
rdd_txt.take(5)

[u'AnonID\tQuery\tQueryTime\tItemRank\tClickURL',
 u'724\tcarbol tunnel\t2006-03-01 01:01:21\t\t',
 u'724\thow to install a glue down floor\t2006-03-01 07:13:45\t2\thttp://doityourself.com',
 u'724\thow to install a glue down floor\t2006-03-01 07:13:45\t8\thttp://www.homerenovationguide.com',
 u'724\thow to install a glue down floor\t2006-03-01 07:13:45\t9\thttp://www.hardwoodinstaller.com']

In [8]:
header = rdd_txt.first()
header

u'AnonID\tQuery\tQueryTime\tItemRank\tClickURL'

In [9]:
rdd_txt = rdd_txt.filter(lambda l: l.startswith("AnonID") == False)

In [10]:
#Paralel (2m)
rdd_txt.count()

36389567

In [10]:
rdd_txt.map(lambda l: l.split("\t")[0]).distinct().count()

657427

In [15]:
rdd_txt.groupBy(lambda l: l.split("\t")[0]).count()

657427

In [19]:
rdd_txt.filter(lambda l: "australia" in l.lower()).map(lambda l: l.split("\t")[0]).distinct().count()

3659

In [20]:
df = rdd_txt.map(lambda l: l.split("\t"))

In [21]:
from datetime import datetime

def raise_err(r):
    try:
        _ = int(r[0])
        _ = datetime.strptime(r[2], "%Y-%m-%d %H:%M:%S")
        return False
    except:
        return True

err = df.filter(lambda r: raise_err(r))
err.count()

1

In [22]:
err.take(5)

[[u'\x19403684',
  u'match.com',
  u'2006-03-31 06:55:53',
  u'2',
  u'http://www.match.com']]

In [23]:
df = df.filter(lambda r: raise_err(r) == False)

In [24]:
df = df.map(lambda r: (int(r[0]), r[1], datetime.strptime(r[2], "%Y-%m-%d %H:%M:%S")))

In [25]:
df.first()

(997, u'carreermanagement.com', datetime.datetime(2006, 3, 1, 10, 54, 58))

In [26]:
from pyspark.sql.types import *
schema = StructType([
                        StructField("id", IntegerType(), True),
                        StructField("query", StringType(), True),
                        StructField("time", TimestampType(), True)
                    ])

df = sqlContext.createDataFrame(df, schema)
df.registerTempTable("aol")

In [27]:
sqlContext.sql("select * from aol where id = 711391 order by time").show()

+------+--------------------+--------------------+
|    id|               query|                time|
+------+--------------------+--------------------+
|711391|can not sleep wit...|2006-03-01 01:24:...|
|711391|cannot sleep with...|2006-03-01 01:24:...|
|711391|cannot sleep with...|2006-03-01 01:24:...|
|711391|cannot sleep with...|2006-03-01 01:33:...|
|711391|  jackie zeaman nude|2006-03-01 15:26:...|
|711391|   jackie zeman nude|2006-03-01 15:26:...|
|711391|      strange cosmos|2006-03-01 16:07:...|
|711391|mansfield first a...|2006-03-01 16:09:...|
|711391|mansfield first a...|2006-03-01 16:09:...|
|711391|reverend harry myers|2006-03-01 16:10:...|
|711391|reverend harry myers|2006-03-01 16:10:...|
|711391|   national enquirer|2006-03-01 17:13:...|
|711391|how to kill mocki...|2006-03-01 17:18:...|
|711391|how to kill mocki...|2006-03-01 17:18:...|
|711391|how to kill annoy...|2006-03-01 17:18:...|
|711391|how to kill annoy...|2006-03-01 17:19:...|
|711391|how to rid your y...|20

In [28]:
df.agg({"time": "min"}).collect()

[Row(min(time)=datetime.datetime(2006, 3, 1, 0, 1, 3))]

In [29]:
user_71845 = df[df.id == 71845]

pd_71845 = user_71845.toPandas()

In [30]:
pd_71845.head()

Unnamed: 0,id,query,time
0,71845,-,2006-04-04 06:12:36
1,71845,-,2006-04-04 06:57:56
2,71845,-,2006-04-04 07:11:52
3,71845,-,2006-04-04 07:12:02
4,71845,-,2006-04-04 07:14:05


In [31]:
sql_by_dates = """
               select cast(time as Date) as dt
                     ,count(*) as cnt
               from aol
               group by cast(time as Date)
               order by dt
               """

by_dates = sqlContext.sql(sql_by_dates)



In [53]:
pd_by_dates = by_dates.toPandas()
pd_by_dates.head()

Unnamed: 0,dt,cnt
0,2006-03-01,454226
1,2006-03-02,474107
2,2006-03-03,428053
3,2006-03-04,467858
4,2006-03-05,515973


In [50]:
from bokeh.charts import TimeSeries, Bar, output_notebook, show
from bokeh.models import PrintfTickFormatter
output_notebook()

In [54]:
pd_by_dates.set_index("dt")



Unnamed: 0_level_0,cnt
dt,Unnamed: 1_level_1
2006-03-01,454226
2006-03-02,474107
2006-03-03,428053
2006-03-04,467858
2006-03-05,515973
2006-03-06,476371
2006-03-07,470863
2006-03-08,456205
2006-03-09,455997
2006-03-10,421557


In [56]:
data = dict(count=pd_by_dates["cnt"], Date=pd_by_dates.index)
