## Setup the Spark

In [0]:
# download and Java and Spark
! apt-get install openjdk-8-jdk-headless -qq > /dev/null
! wget -q https://archive.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
! tar xf spark-2.4.5-bin-hadoop2.7.tgz
! pip install -q findspark

In [0]:
# set the environment variables for spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
! python -m pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/9a/5a/271c416c1c2185b6cb0151b29a91fff6fcaed80173c8584ff6d20e46b465/pyspark-2.4.5.tar.gz (217.8MB)
[K     |████████████████████████████████| 217.8MB 42kB/s 
[?25hCollecting py4j==0.10.7
[?25l  Downloading https://files.pythonhosted.org/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197kB)
[K     |████████████████████████████████| 204kB 44.8MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-2.4.5-py2.py3-none-any.whl size=218257927 sha256=9f1c54ec6cd0fe2ffe02d2e4520eccf22302b8a1a27c120825152b8dc1372e0b
  Stored in directory: /root/.cache/pip/wheels/bf/db/04/61d66a5939364e756eb1c1be4ec5bdce6e04047fc7929a3c3c
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.7 pyspark-2.4.5


In [0]:
from google.colab import drive
drive.mount('/gdrive')
%cd /gdrive

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /gdrive
/gdrive


### Importing

In [0]:
from pyspark import SparkContext, SQLContext

In [0]:
sc = SparkContext()

In [0]:
sqlContext = SQLContext(sc)

### Simple DataFrame

In [0]:
df = sqlContext.createDataFrame(data=[
    ('a', 4), 
    ('b', 2)
  ], 
  schema=['Column A', 'Column B']
)

In [0]:
# action
df.count()

2

In [0]:
df.head()

Row(Column A='a', Column B=4)

In [0]:
df.show()

+--------+--------+
|Column A|Column B|
+--------+--------+
|       a|       4|
|       b|       2|
+--------+--------+



### Load the Dataset

In [0]:
sc.textFile('data.csv', use_unicode=True).take(4)

['"date","time","size","r_version","r_arch","r_os","package","version","country","ip_id"',
 '"2015-12-12","13:42:10",257886,"3.2.2","i386","mingw32","HistData","0.7-6","CZ",1',
 '"2015-12-12","13:24:37",1236751,"3.2.2","x86_64","mingw32","RJSONIO","1.3-0","DE",2',
 '"2015-12-12","13:42:35",2077876,"3.2.2","i386","mingw32","UsingR","2.0-5","CZ",1']

In [0]:
data = sc.textFile('data.csv').map(lambda x: x.replace('"', '')).map(lambda x: x.split(','))

In [0]:
data.take(2)

[['date',
  'time',
  'size',
  'r_version',
  'r_arch',
  'r_os',
  'package',
  'version',
  'country',
  'ip_id'],
 ['2015-12-12',
  '13:42:10',
  '257886',
  '3.2.2',
  'i386',
  'mingw32',
  'HistData',
  '0.7-6',
  'CZ',
  '1']]

In [0]:
# this is an RDD
data.take(1)[0]

['date',
 'time',
 'size',
 'r_version',
 'r_arch',
 'r_os',
 'package',
 'version',
 'country',
 'ip_id']

In [0]:
# this is an DataFrame - creating dataframe from text file
df2 = sqlContext.createDataFrame(
    data=data.filter(lambda x: x[0] != 'date'),
    schema=data.take(1)[0]
)

In [0]:
# transformation 2 in here.
df2

DataFrame[date: string, time: string, size: string, r_version: string, r_arch: string, r_os: string, package: string, version: string, country: string, ip_id: string]

In [0]:
# serial way
df2.toPandas().sample()

Unnamed: 0,date,time,size,r_version,r_arch,r_os,package,version,country,ip_id
35233,2015-12-12,02:35:23,161227,3.2.2,x86_64,linux-gnu,gridBase,0.4-7,CA,2824


In [0]:
# this will save the the data apply transformations so far, and save in rdd or dataframe format
df2.persist()

DataFrame[date: string, time: string, size: string, r_version: string, r_arch: string, r_os: string, package: string, version: string, country: string, ip_id: string]

In [0]:
df2.show(2)

+----------+--------+-------+---------+------+-------+--------+-------+-------+-----+
|      date|    time|   size|r_version|r_arch|   r_os| package|version|country|ip_id|
+----------+--------+-------+---------+------+-------+--------+-------+-------+-----+
|2015-12-12|13:42:10| 257886|    3.2.2|  i386|mingw32|HistData|  0.7-6|     CZ|    1|
|2015-12-12|13:24:37|1236751|    3.2.2|x86_64|mingw32| RJSONIO|  1.3-0|     DE|    2|
+----------+--------+-------+---------+------+-------+--------+-------+-------+-----+
only showing top 2 rows



In [0]:
df2.columns

['date',
 'time',
 'size',
 'r_version',
 'r_arch',
 'r_os',
 'package',
 'version',
 'country',
 'ip_id']

In [0]:
df2.dtypes

[('date', 'string'),
 ('time', 'string'),
 ('size', 'string'),
 ('r_version', 'string'),
 ('r_arch', 'string'),
 ('r_os', 'string'),
 ('package', 'string'),
 ('version', 'string'),
 ('country', 'string'),
 ('ip_id', 'string')]

### Preprocessing

In [0]:
from pyspark.sql.types import IntegerType, DateType

In [0]:
df3 = df2\
.withColumn('size', df2.size.cast(IntegerType())) \
.withColumn('date', df2.date.cast(DateType()))

In [0]:
# the new dataframe
df3.take(1)

[Row(date=datetime.date(2015, 12, 12), time='13:42:10', size=257886, r_version='3.2.2', r_arch='i386', r_os='mingw32', package='HistData', version='0.7-6', country='CZ', ip_id='1')]

In [0]:
# for comparison, the scnd dataframe look
df2.take(1)

[Row(date='2015-12-12', time='13:42:10', size='257886', r_version='3.2.2', r_arch='i386', r_os='mingw32', package='HistData', version='0.7-6', country='CZ', ip_id='1')]

In [0]:
df3.dtypes

[('date', 'date'),
 ('time', 'string'),
 ('size', 'int'),
 ('r_version', 'string'),
 ('r_arch', 'string'),
 ('r_os', 'string'),
 ('package', 'string'),
 ('version', 'string'),
 ('country', 'string'),
 ('ip_id', 'string')]

In [0]:
df3.show(2)

+----------+--------+-------+---------+------+-------+--------+-------+-------+-----+
|      date|    time|   size|r_version|r_arch|   r_os| package|version|country|ip_id|
+----------+--------+-------+---------+------+-------+--------+-------+-------+-----+
|2015-12-12|13:42:10| 257886|    3.2.2|  i386|mingw32|HistData|  0.7-6|     CZ|    1|
|2015-12-12|13:24:37|1236751|    3.2.2|x86_64|mingw32| RJSONIO|  1.3-0|     DE|    2|
+----------+--------+-------+---------+------+-------+--------+-------+-------+-----+
only showing top 2 rows



#### Renaming Columns

In [0]:
df4 = df3.withColumnRenamed('size', 'sizeeee')

In [0]:
df4.show(1)

+----------+--------+-------+---------+------+-------+--------+-------+-------+-----+
|      date|    time|sizeeee|r_version|r_arch|   r_os| package|version|country|ip_id|
+----------+--------+-------+---------+------+-------+--------+-------+-------+-----+
|2015-12-12|13:42:10| 257886|    3.2.2|  i386|mingw32|HistData|  0.7-6|     CZ|    1|
+----------+--------+-------+---------+------+-------+--------+-------+-------+-----+
only showing top 1 row



In [0]:
df3.sort(df3.size.desc()).show(1) # df3.size.asc()

+----------+--------+--------+---------+------+---------+---------+-------+-------+-----+
|      date|    time|    size|r_version|r_arch|     r_os|  package|version|country|ip_id|
+----------+--------+--------+---------+------+---------+---------+-------+-------+-----+
|2015-12-12|02:28:49|68736865|    3.3.0|x86_64|linux-gnu|TCGA2STAT|    1.2|     US| 2700|
+----------+--------+--------+---------+------+---------+---------+-------+-------+-----+
only showing top 1 row



#### Aggregation

In [0]:
df3.groupBy('package').count().sort('count', ascending=False).show(5)

+-------+-----+
|package|count|
+-------+-----+
|   Rcpp| 4783|
|ggplot2| 3913|
|stringi| 3748|
|stringr| 3449|
|   plyr| 3436|
+-------+-----+
only showing top 5 rows



In [0]:
df3.filter(df3.size > 1000).count()

374873

In [0]:
# TODO: look into this - is it not an action?
df3.groupBy('package').count().show(4)

+----------+-----+
|   package|count|
+----------+-----+
|   TH.data|  532|
|     sharx|   20|
|   spssDDI|    8|
|xpose4data|    5|
+----------+-----+
only showing top 4 rows



### Spark SQL

In [0]:
# this creates temporary table withing 'sc' context.
df3.createOrReplaceGlobalTempView('r_packages')

AttributeError: ignored

In [0]:
sqlContext = SQLContext(sc)

In [0]:
sqlContext.sql('select package from global_temp.r_packages')

DataFrame[package: string]

In [0]:
sqlContext.table('global_temp.r_packages').cache()

DataFrame[date: date, time: string, size: int, r_version: string, r_arch: string, r_os: string, package: string, version: string, country: string, ip_id: string]

In [0]:
sqlContext.sql('select package, size, r_os from global_temp.r_packages where size > 1000 order by 2 desc').show(3)

+---------+--------+---------+
|  package|    size|     r_os|
+---------+--------+---------+
|TCGA2STAT|68736865|  mingw32|
|TCGA2STAT|68736865|  mingw32|
|TCGA2STAT|68736865|linux-gnu|
+---------+--------+---------+
only showing top 3 rows



In [0]:
query_results = sqlContext.sql("""
  select package, r_os, max(size)
  from global_temp.r_packages 
  where size > 1000 
  group by package, r_os
  having package = 'TCGA2STAT' -- r_os = 'linux-gnu'
  order by 2 desc
""")

In [0]:
query_results.show(3)

+---------+------------+---------+
|  package|        r_os|max(size)|
+---------+------------+---------+
|TCGA2STAT|     mingw32| 68736865|
|TCGA2STAT|   linux-gnu| 68736865|
|TCGA2STAT|darwin13.4.0| 68736862|
+---------+------------+---------+
only showing top 3 rows



In [0]:
# this is RDD, doesn't schema, which will show me the columns
query_results.rdd.map(lambda x: x.package + ' - ' + x.r_os).take(2)

['data.table - solaris2.10', 'swirl - mingw32']

In [0]:
# This is from dataframe, do the transformation and take 2 
# TODO: do this
query_results.select(['package', 'r_os']).map(lambda x: x.package + ' - ' + x.r_os).take(2)

AttributeError: ignored