In [1]:
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row
import datetime
sc = SparkContext()
sqlContext = SQLContext(sc)

In [2]:
def parseDate(s):
    # 2015-04-01 07:14:35.553
    return datetime.datetime(int(s[0:4]),
                             int(s[5:7]),
                             int(s[8:10]),
                             int(s[11:13]),
                             int(s[14:16]),
                             int(s[17:19]),
                             int(s[20:23])*1000)

In [7]:
callList = sc.textFile("samplecalldata.txt")
callParts = callList.map(lambda l: l.split("|"))
callPartsFiltered=callParts.filter(lambda a:a[5]!='null')
callData = callPartsFiltered.map(lambda p: Row(startdate=parseDate(p[0]), 
                                   enddate=parseDate(p[1]),
                                   callingid=p[2],
                                   callerid=p[3],
                                   ignore=p[4],
                                   duration=float(p[5])))

schemaAll = sqlContext.createDataFrame(callData)
schemaAll.registerTempTable("calldata")

In [8]:
callerLimitedRDD = sqlContext.sql("SELECT * FROM calldata limit 10")
print (callerLimitedRDD.collect())

[Row(callerid='115129411', callingid='108314025', duration=21.675, enddate=datetime.datetime(2015, 4, 1, 6, 44, 14, 56000), ignore='16', startdate=datetime.datetime(2015, 4, 1, 6, 43, 29, 935000)), Row(callerid='115129411', callingid='108314025', duration=22.299, enddate=datetime.datetime(2015, 4, 1, 6, 44, 14, 145000), ignore='16', startdate=datetime.datetime(2015, 4, 1, 6, 43, 39, 833000)), Row(callerid='115129411', callingid='94458575', duration=5.642, enddate=datetime.datetime(2015, 4, 1, 6, 20, 12, 756000), ignore='16', startdate=datetime.datetime(2015, 4, 1, 6, 19, 49, 388000)), Row(callerid='115129411', callingid='115131225', duration=8.18, enddate=datetime.datetime(2015, 4, 1, 7, 2, 6, 996000), ignore='16', startdate=datetime.datetime(2015, 4, 1, 7, 1, 41, 4000)), Row(callerid='115129411', callingid='115131225', duration=7.64, enddate=datetime.datetime(2015, 4, 1, 7, 2, 6, 866000), ignore='16', startdate=datetime.datetime(2015, 4, 1, 7, 1, 40, 366000)), Row(callerid='115129411'

In [9]:
callOrderedRDD = sqlContext.sql("SELECT * FROM calldata order by duration desc")
print (callOrderedRDD.take(10))

[Row(callerid='120839579', callingid='118784385', duration=13340.302, enddate=datetime.datetime(2015, 4, 1, 4, 52, 41, 139000), ignore='16', startdate=datetime.datetime(2015, 4, 1, 1, 6, 6, 631000)), Row(callerid='126298287', callingid='40474532', duration=8663.86, enddate=datetime.datetime(2015, 4, 1, 0, 54, 11, 249000), ignore='16', startdate=datetime.datetime(2015, 3, 31, 23, 34, 24, 284000)), Row(callerid='53268699', callingid='128474306', duration=8038.379, enddate=datetime.datetime(2015, 4, 1, 2, 43, 2, 102000), ignore='16', startdate=datetime.datetime(2015, 4, 1, 0, 59, 8, 366000)), Row(callerid='124065798', callingid='85950285', duration=7678.295999999999, enddate=datetime.datetime(2015, 4, 1, 2, 2, 17, 15000), ignore='127', startdate=datetime.datetime(2015, 4, 1, 0, 0, 50, 318000)), Row(callerid='111741397', callingid='128596125', duration=7499.612, enddate=datetime.datetime(2015, 4, 1, 2, 28, 43, 156000), ignore='31', startdate=datetime.datetime(2015, 4, 1, 0, 23, 32, 672000)

In [10]:
callerFilteredRDD = sqlContext.sql("SELECT max(duration),callerid FROM calldata where duration <> 'null' group by callerid")
print (callerFilteredRDD.take(10))

[]


In [11]:
callerDateRDD = sqlContext.sql("SELECT * FROM calldata where enddate between '01.04.2015' and '02.04.2015' limit 10")
print (callerLimitedRDD.collect())

[Row(callerid='115129411', callingid='108314025', duration=21.675, enddate=datetime.datetime(2015, 4, 1, 6, 44, 14, 56000), ignore='16', startdate=datetime.datetime(2015, 4, 1, 6, 43, 29, 935000)), Row(callerid='115129411', callingid='108314025', duration=22.299, enddate=datetime.datetime(2015, 4, 1, 6, 44, 14, 145000), ignore='16', startdate=datetime.datetime(2015, 4, 1, 6, 43, 39, 833000)), Row(callerid='115129411', callingid='94458575', duration=5.642, enddate=datetime.datetime(2015, 4, 1, 6, 20, 12, 756000), ignore='16', startdate=datetime.datetime(2015, 4, 1, 6, 19, 49, 388000)), Row(callerid='115129411', callingid='115131225', duration=8.18, enddate=datetime.datetime(2015, 4, 1, 7, 2, 6, 996000), ignore='16', startdate=datetime.datetime(2015, 4, 1, 7, 1, 41, 4000)), Row(callerid='115129411', callingid='115131225', duration=7.64, enddate=datetime.datetime(2015, 4, 1, 7, 2, 6, 866000), ignore='16', startdate=datetime.datetime(2015, 4, 1, 7, 1, 40, 366000)), Row(callerid='115129411'

In [17]:
locList = sc.textFile("subscriberlocation.txt")
locParts = locList.map(lambda l: l.split("|"))
locData = locParts.map(lambda p: Row(callid=p[0], 
                                   city=p[1]))

schemaAll = sqlContext.createDataFrame(locData)
schemaAll.registerTempTable("locdata")

In [18]:
joinedRDD = sqlContext.sql("SELECT * FROM calldata a join locdata b ON (a.callerid=b.callid) order by a.duration desc")
print (joinedRDD.take(5))

[Row(callerid='123826198', callingid='42792148', duration=7480.249, enddate=datetime.datetime(2015, 4, 1, 1, 48, 36, 203000), ignore='null', startdate=datetime.datetime(2015, 3, 31, 23, 43, 24, 186000), callid='123826198', city='ISTANBUL'), Row(callerid='109265159', callingid='100915903', duration=7477.274, enddate=datetime.datetime(2015, 4, 1, 4, 35, 59, 84000), ignore='31', startdate=datetime.datetime(2015, 4, 1, 2, 31, 5, 384000), callid='109265159', city='KIRKLARELI'), Row(callerid='129193118', callingid='118862119', duration=7466.462, enddate=datetime.datetime(2015, 4, 1, 4, 11, 11, 805000), ignore='31', startdate=datetime.datetime(2015, 4, 1, 2, 6, 31, 319000), callid='129193118', city='ISTANBUL'), Row(callerid='46008428', callingid='107795615', duration=7461.854, enddate=datetime.datetime(2015, 4, 1, 2, 36, 23, 518000), ignore='null', startdate=datetime.datetime(2015, 4, 1, 0, 31, 44, 184000), callid='46008428', city='ISTANBUL'), Row(callerid='11696019', callingid='131529841', d

In [14]:
testRDD = sqlContext.sql("SELECT * FROM calldata a where a.callerid='123826198' order by a.duration desc")
print (testRDD.take(5))

[Row(callerid='123826198', callingid='42792148', duration=7480.249, enddate=datetime.datetime(2015, 4, 1, 1, 48, 36, 203000), ignore='null', startdate=datetime.datetime(2015, 3, 31, 23, 43, 24, 186000))]
