In [162]:
from pyspark.sql.functions import size, udf, unix_timestamp, max, min

In [167]:
from pyspark.sql.types import IntegerType, LongType

In [116]:
from pyspark.sql.functions import array_contains


# Load metadata avro

In [117]:
reader = sqlContext.read.format('com.databricks.spark.avro')

In [118]:
meta = reader.load('data/spark_metadata.avro')

In [119]:
meta.columns

['id',
 'path',
 'message_id',
 'date',
 'from',
 'to',
 'cc',
 'bcc',
 'subject',
 'references']

# Loading topic distributions

In [120]:
topdisFile = 'data/enron_small_topic_distributions.tuples'

csvLoader = sqlContext.read.format('com.databricks.spark.csv')
topdis = csvLoader.options(delimiter=',',header='false', inferschema='true').load(topdisFile)

In [121]:
topdis.columns

['C0',
 'C1',
 'C2',
 'C3',
 'C4',
 'C5',
 'C6',
 'C7',
 'C8',
 'C9',
 'C10',
 'C11',
 'C12',
 'C13',
 'C14',
 'C15']

In [122]:
strip_first_col = udf(lambda row: int(row[1:]), IntegerType())
topdis = topdis.withColumn('C0',strip_first_col(topdis['C0']))

# Load dictionary CSV

In [123]:
dicFile = 'enron_small_dic.csv'

csvLoader = sqlContext.read.format('com.databricks.spark.csv')
dic = csvLoader.options(delimiter='\t', header='false', inferschema='true').load(dicFile)
dic = dic.select(dic['C0'].alias('id'), dic['C1'].alias('word'), dic['C2'].alias('count'))

In [124]:
dic.columns

['id', 'word', 'count']

# Load clustertopics CSV

In [125]:
clutoFile = 'enron_small_clustertopics.csv'

csvLoader = sqlContext.read.format('com.databricks.spark.csv')
cluto = csvLoader.options(delimiter=',', header='false', inferschema='true').load(clutoFile)
#dic = dic.select(dic['C0'].alias('id'), dic['C1'].alias('word'), dic['C2'].alias('count'))

In [126]:
cluto.columns

['C0', 'C1', 'C2', 'C3', 'C4']

# Load topicswords CSV

In [127]:
towoFile = 'enron_small_lda_transposed.csv'

csvLoader = sqlContext.read.format('com.databricks.spark.csv')
towo = csvLoader.options(delimiter=',', header='false', inferschema='true').load(towoFile)
#dic = dic.select(dic['C0'].alias('id'), dic['C1'].alias('word'), dic['C2'].alias('count'))'


In [128]:
towo.columns

['C0',
 'C1',
 'C2',
 'C3',
 'C4',
 'C5',
 'C6',
 'C7',
 'C8',
 'C9',
 'C10',
 'C11',
 'C12',
 'C13',
 'C14']

# Merge topdis which has document id and with metadata, based on document id

In [129]:
meta.columns

['id',
 'path',
 'message_id',
 'date',
 'from',
 'to',
 'cc',
 'bcc',
 'subject',
 'references']

In [130]:
metasmall = meta.select('id',unix_timestamp(meta['date'],"yyyy-MM-dd'T'HH:mm:ssX").alias("timestamp"))
doctopdat = topdis.join(metasmall, metasmall.id == topdis.C0,'inner')

In [131]:
metasmall.show()

+------+---------+
|    id|timestamp|
+------+---------+
|263168|986997480|
|263169|988382460|
|263170|958665420|
|263171|957974880|
|263172|982673940|
|263173|985683420|
|263174|972667500|
|263175|988361040|
|263176|987674100|
|263177|970148880|
|263178|968759640|
|263179|982778280|
|263180|961145160|
|263181|990440220|
|263182|976064400|
|263183|969376680|
|263184|962993820|
|263185|984742260|
|263186|982754280|
|263187|960397740|
+------+---------+
only showing top 20 rows



In [178]:
maxdate = doctopdat.select(max('timestamp').alias('maxtimestamp')).collect()[0]['maxtimestamp']
mindate = doctopdat.select(min('timestamp').alias('mintimestamp')).collect()[0]['mintimestamp']  

In [179]:
print maxdate, mindate


1009958530 991169400


In [181]:
import numpy as np
binborders =map(int,list(np.linspace(start=mindate,stop=maxdate,num=30,endpoint=False)))


In [182]:
print binborders[-1]

1009332225


In [191]:
#f = udf(lambda x:np.argmax(x < binborders) -1,IntegerType())
def f(x):
    for i in range(len(binborders)):
        if x < binborders[i]:            
            return binborders[i-1]
    return binborders[-1]
    
doctopdat = doctopdat.withColumn('bin',udf(f,LongType())(doctopdat['timestamp']))

In [195]:
newdataframe = doctopdat.groupBy('bin').avg()

In [200]:
newdataframe.take(1)

[Row(bin=996179834, avg(C0)=1893.71875, avg(C2)=0.07288942446407053, avg(C3)=0.07596484700417556, avg(C4)=0.06602575028057349, avg(C5)=0.05413960548976752, avg(C6)=0.05225609203825071, avg(C7)=0.07715599872767398, avg(C8)=0.05341275211642137, avg(C9)=0.08024595719927158, avg(C10)=0.06522699142101661, avg(C11)=0.06228288576044858, avg(C12)=0.09235627687222708, avg(C13)=0.05351567203849644, avg(C14)=0.07305290255836651, avg(id)=1893.71875, avg(timestamp)=996539539.90625, avg(bin)=996179834.0)]

In [201]:
996179834 / (3600*24*7*52)

31

In [202]:
newdataframe.columns

['bin',
 'avg(C0)',
 'avg(C2)',
 'avg(C3)',
 'avg(C4)',
 'avg(C5)',
 'avg(C6)',
 'avg(C7)',
 'avg(C8)',
 'avg(C9)',
 'avg(C10)',
 'avg(C11)',
 'avg(C12)',
 'avg(C13)',
 'avg(C14)',
 'avg(id)',
 'avg(timestamp)',
 'avg(bin)']

In [217]:
print ((maxdate / (365*24.0*3600)) - (mindate / (365*24.0*3600))) * 12, "months"

7.14959284627 months
