### There are cases when you have to parse data before loading into DataFrame
* Data is not of csv format.
* Further parsing of data elements is required
* ...


#### Typical workflow
1. read data as an RDD of text lines.
2. split each line into a list of elements.
3. parse each element (if needed)
4. convert each line into a **_sqlcontext Row_**, i.e. RDD of **_Rows_**
5. convert the RDD into a DataFrame


##### *Row* is a list of key/value pairs represented as
$ [ col_1=value_1, col_2=value_2, ..., col_n = value_n ]$

#### Example: Analyzing Web log data

In [5]:
from pyspark.sql import Row
log_file = sc.textFile("./data/log_file.txt")

In [7]:
import shlex
splits = log_file.map(lambda row: shlex.split(row))
splits.takeSample(True, 5)

[['bettong.client.uq.oz.au', '[30:01:46:25]', 'GET /enviro/gif/mail.gif HTTP/1.0', '200', '959'], ['localhost.mpc.wa.gov.au', '[30:02:55:44]', 'GET /logos/us-flag.gif HTTP/1.0', '200', '2788'], ['epsongw3.epson.co.jp', '[30:04:29:45]', 'GET /docs/OCEPAterms/aaad.html HTTP/1.0', '200', '14536'], ['sanipc16.sani.chalmers.se', '[30:02:01:36]', 'GET /logos/us-flag.gif HTTP/1.0', '304', '0'], ['204.62.245.32', '[30:02:14:29]', 'GET /index.html HTTP/1.0', '200', '4889']]

In [9]:
def create_schema(row):
  ip = row[0]
  date = row[1].replace('[', '').replace(']', '')
  tokens = row[2].split(' ')
  protocol = tokens[0]
  url = tokens[1].split('?')[0]
  status = row[3]
  time = None if row[4] == '-' else int(row[4]) 
  return Row(ip=ip, date=date, protocol=protocol, url=url, status=status, time=time)

In [10]:
row_data = splits.map(create_schema)
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)


In [11]:
schema_DF = sqlContext.createDataFrame(row_data)
schema_DF.createOrReplaceTempView('logs')
sample = sqlContext.sql('SELECT * FROM logs LIMIT 10').collect()   # returns list
for row in sample:
    print row

Row(date=u'29:23:53:25', ip=u'141.243.1.172', protocol=u'GET', status=u'200', time=1497, url=u'/Software.html')
Row(date=u'29:23:53:36', ip=u'query2.lycos.cs.cmu.edu', protocol=u'GET', status=u'200', time=1325, url=u'/Consumer.html')
Row(date=u'29:23:53:53', ip=u'tanuki.twics.com', protocol=u'GET', status=u'200', time=1014, url=u'/News.html')
Row(date=u'29:23:54:15', ip=u'wpbfl2-45.gate.net', protocol=u'GET', status=u'200', time=4889, url=u'/')
Row(date=u'29:23:54:16', ip=u'wpbfl2-45.gate.net', protocol=u'GET', status=u'200', time=2624, url=u'/icons/circle_logo_small.gif')
Row(date=u'29:23:54:18', ip=u'wpbfl2-45.gate.net', protocol=u'GET', status=u'200', time=935, url=u'/logos/small_gopher.gif')
Row(date=u'29:23:54:19', ip=u'140.112.68.165', protocol=u'GET', status=u'200', time=2788, url=u'/logos/us-flag.gif')
Row(date=u'29:23:54:19', ip=u'wpbfl2-45.gate.net', protocol=u'GET', status=u'200', time=124, url=u'/logos/small_ftp.gif')
Row(date=u'29:23:54:19', ip=u'wpbfl2-45.gate.net', proto

In [12]:
schema_DF.show()

+-----------+--------------------+--------+------+-----+--------------------+
|       date|                  ip|protocol|status| time|                 url|
+-----------+--------------------+--------+------+-----+--------------------+
|29:23:53:25|       141.243.1.172|     GET|   200| 1497|      /Software.html|
|29:23:53:36|query2.lycos.cs.c...|     GET|   200| 1325|      /Consumer.html|
|29:23:53:53|    tanuki.twics.com|     GET|   200| 1014|          /News.html|
|29:23:54:15|  wpbfl2-45.gate.net|     GET|   200| 4889|                   /|
|29:23:54:16|  wpbfl2-45.gate.net|     GET|   200| 2624|/icons/circle_log...|
|29:23:54:18|  wpbfl2-45.gate.net|     GET|   200|  935|/logos/small_goph...|
|29:23:54:19|      140.112.68.165|     GET|   200| 2788|  /logos/us-flag.gif|
|29:23:54:19|  wpbfl2-45.gate.net|     GET|   200|  124|/logos/small_ftp.gif|
|29:23:54:19|  wpbfl2-45.gate.net|     GET|   200|  156|     /icons/book.gif|
|29:23:54:19|  wpbfl2-45.gate.net|     GET|   200| 2788|  /logos

### Most visited URLs?

In [15]:
url_access = sqlContext.sql('''SELECT url, count(*) as counts FROM logs GROUP BY url
  ORDER BY counts DESC LIMIT 10''')#.collect()

url_access.show()
#for row in url_access:
#    print row
    


+--------------------+------+
|                 url|counts|
+--------------------+------+
|/icons/circle_log...|   128|
|                   /|   103|
|  /logos/us-flag.gif|    74|
|/logos/small_ftp.gif|    74|
|     /icons/book.gif|    71|
|/logos/small_goph...|    71|
|    /icons/ok2-0.gif|    69|
|/waisicons/unknow...|    62|
|   /cgi-bin/waisgate|    43|
|/cgi-bin/waisgate...|    26|
+--------------------+------+



### Most frequent visitors?

In [24]:
visitors = sqlContext.sql('''SELECT ip, count(*) as counts FROM logs GROUP BY ip
  ORDER BY counts DESC LIMIT 10''').collect()
for row in visitors:
    print row

Row(ip=u'ix-eve-wa2-02.ix.netcom.com', counts=126)
Row(ip=u'pm2-3.niia.net', counts=81)
Row(ip=u'202.32.50.6', counts=73)
Row(ip=u'bettong.client.uq.oz.au', counts=64)
Row(ip=u'macn352.riken.go.jp', counts=50)
Row(ip=u'sfsp03.slip.net', counts=47)
Row(ip=u'h46.s101.wcc.com', counts=46)
Row(ip=u'cragateway.cra.com.au', counts=44)
Row(ip=u'port11.annex1.naples.net', counts=43)
Row(ip=u'hmu4.cs.auckland.ac.nz', counts=39)


In [17]:
td = schema_DF.select("ip","time").filter(schema_DF.time > 3000).groupBy('ip').count().orderBy('count', ascending=False).show()


+--------------------+-----+
|                  ip|count|
+--------------------+-----+
|      pm2-3.niia.net|   19|
|hmu4.cs.auckland....|   19|
|ix-eve-wa2-02.ix....|   18|
|     sfsp03.slip.net|   15|
|      168.95.125.161|   14|
|     203.251.228.110|   14|
|epsongw3.epson.co.jp|   13|
|  infoman.cisnet.com|   13|
|bettong.client.uq...|   12|
|       161.122.12.78|   11|
| macn352.riken.go.jp|   10|
|port11.annex1.nap...|   10|
|nnex02.ppp.uni-ma...|    9|
|cnts4p16.uwaterlo...|    9|
|    h46.s101.wcc.com|    9|
|daddylongleg.euro...|    9|
|  ext-ns.dpie.gov.au|    8|
|        132.74.12.10|    8|
|www-c1.proxy.aol.com|    7|
|         202.32.50.6|    7|
+--------------------+-----+
only showing top 20 rows



In [33]:
print td

Name: org.apache.toree.interpreter.broker.BrokerException
Message: Traceback (most recent call last):
  File "/var/folders/qy/5hsc8rns55bd4yqn51r9dn7czrp35j/T/kernel-PySpark-56b458e8-56da-4dee-be2f-7cbd9f102dbf/pyspark_runner.py", line 189, in <module>
    eval(compiled_code)
  File "<string>", line 1, in <module>
AttributeError: 'GroupedData' object has no attribute 'collect'

StackTrace: org.apache.toree.interpreter.broker.BrokerState$$anonfun$markFailure$1.apply(BrokerState.scala:163)
org.apache.toree.interpreter.broker.BrokerState$$anonfun$markFailure$1.apply(BrokerState.scala:163)
scala.Option.foreach(Option.scala:257)
org.apache.toree.interpreter.broker.BrokerState.markFailure(BrokerState.scala:162)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
py4j.reflect