In [38]:
import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.functions import *

spark = SparkSession.builder\
       .appName("Pyspark_dataframe")\
       .getOrCreate()

In [6]:
raw_data = sc.textFile('file:///D:\\data\\kddcup.txt').cache()
raw_data.take(5)

['0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
 '0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']

In [8]:
#schema inference

csv_data = raw_data.map(lambda l: l.split(","))
row_data=csv_data.map(lambda p: Row(
    duration=int(p[0]), 
    protocol_type=p[1],
    service=p[2],
    flag=p[3],
    src_bytes=int(p[4]),
    dst_bytes=int(p[5])
    )
)

In [9]:
interactions_df = spark.createDataFrame(row_data)

#interactions_df.show()

interactions_df.createOrReplaceTempView("interactions")
#interactions_df.registerTempTable("interactions")



In [10]:
spark.sql("select * from interactions").show()

+---------+--------+----+-------------+-------+---------+
|dst_bytes|duration|flag|protocol_type|service|src_bytes|
+---------+--------+----+-------------+-------+---------+
|     5450|       0|  SF|          tcp|   http|      181|
|      486|       0|  SF|          tcp|   http|      239|
|     1337|       0|  SF|          tcp|   http|      235|
|     1337|       0|  SF|          tcp|   http|      219|
|     2032|       0|  SF|          tcp|   http|      217|
|     2032|       0|  SF|          tcp|   http|      217|
|     1940|       0|  SF|          tcp|   http|      212|
|     4087|       0|  SF|          tcp|   http|      159|
|      151|       0|  SF|          tcp|   http|      210|
|      786|       0|  SF|          tcp|   http|      212|
|      624|       0|  SF|          tcp|   http|      210|
|     1985|       0|  SF|          tcp|   http|      177|
|      773|       0|  SF|          tcp|   http|      222|
|     1169|       0|  SF|          tcp|   http|      256|
|      259|   

In [11]:
interactions_df.printSchema()

root
 |-- dst_bytes: long (nullable = true)
 |-- duration: long (nullable = true)
 |-- flag: string (nullable = true)
 |-- protocol_type: string (nullable = true)
 |-- service: string (nullable = true)
 |-- src_bytes: long (nullable = true)



In [12]:
# Select tcp network interactions with more than 1 second duration and no transfer from destination
tcp_interactions = spark.sql("""
    SELECT duration, dst_bytes FROM interactions WHERE protocol_type = 'tcp' AND duration > 1000 AND dst_bytes = 0
""")
tcp_interactions.show()

+--------+---------+
|duration|dst_bytes|
+--------+---------+
|    5057|        0|
|    5059|        0|
|    5051|        0|
|    5056|        0|
|    5051|        0|
|    5039|        0|
|    5062|        0|
|    5041|        0|
|    5056|        0|
|    5064|        0|
|    5043|        0|
|    5061|        0|
|    5049|        0|
|    5061|        0|
|    5048|        0|
|    5047|        0|
|    5044|        0|
|    5063|        0|
|    5068|        0|
|    5062|        0|
+--------+---------+
only showing top 20 rows



In [13]:
interactions_df.head()

Row(dst_bytes=5450, duration=0, flag='SF', protocol_type='tcp', service='http', src_bytes=181)

In [14]:
interactions_df.first()

Row(dst_bytes=5450, duration=0, flag='SF', protocol_type='tcp', service='http', src_bytes=181)

In [15]:
# print out elements in each row

tcp_interactions_out=tcp_interactions.rdd.map(lambda p:'duration:{}:dest_bytes:{}'.format(p.duration,p.dst_bytes))

for t in tcp_interactions_out.collect():
    print(t)
    


duration:5057:dest_bytes:0
duration:5059:dest_bytes:0
duration:5051:dest_bytes:0
duration:5056:dest_bytes:0
duration:5051:dest_bytes:0
duration:5039:dest_bytes:0
duration:5062:dest_bytes:0
duration:5041:dest_bytes:0
duration:5056:dest_bytes:0
duration:5064:dest_bytes:0
duration:5043:dest_bytes:0
duration:5061:dest_bytes:0
duration:5049:dest_bytes:0
duration:5061:dest_bytes:0
duration:5048:dest_bytes:0
duration:5047:dest_bytes:0
duration:5044:dest_bytes:0
duration:5063:dest_bytes:0
duration:5068:dest_bytes:0
duration:5062:dest_bytes:0
duration:5046:dest_bytes:0
duration:5052:dest_bytes:0
duration:5044:dest_bytes:0
duration:5054:dest_bytes:0
duration:5039:dest_bytes:0
duration:5058:dest_bytes:0
duration:5051:dest_bytes:0
duration:5032:dest_bytes:0
duration:5063:dest_bytes:0
duration:5040:dest_bytes:0
duration:5051:dest_bytes:0
duration:5066:dest_bytes:0
duration:5044:dest_bytes:0
duration:5051:dest_bytes:0
duration:5036:dest_bytes:0
duration:5055:dest_bytes:0
duration:2426:dest_bytes:0
d

In [16]:
tcp_interactions_out.take(5)

['duration:5057:dest_bytes:0',
 'duration:5059:dest_bytes:0',
 'duration:5051:dest_bytes:0',
 'duration:5056:dest_bytes:0',
 'duration:5051:dest_bytes:0']

In [17]:
df_data=tcp_interactions_out.map(lambda x: x.split(':'))
df_data.take(5)

[['duration', '5057', 'dest_bytes', '0'],
 ['duration', '5059', 'dest_bytes', '0'],
 ['duration', '5051', 'dest_bytes', '0'],
 ['duration', '5056', 'dest_bytes', '0'],
 ['duration', '5051', 'dest_bytes', '0']]

In [18]:
df_1=df_data.map(lambda y: Row(duration=y[1],dest_bytes=y[3]))
df=spark.createDataFrame(df_1).show()

+----------+--------+
|dest_bytes|duration|
+----------+--------+
|         0|    5057|
|         0|    5059|
|         0|    5051|
|         0|    5056|
|         0|    5051|
|         0|    5039|
|         0|    5062|
|         0|    5041|
|         0|    5056|
|         0|    5064|
|         0|    5043|
|         0|    5061|
|         0|    5049|
|         0|    5061|
|         0|    5048|
|         0|    5047|
|         0|    5044|
|         0|    5063|
|         0|    5068|
|         0|    5062|
+----------+--------+
only showing top 20 rows



In [25]:
dat = sc.textFile("file:///D:\\data\\2015-12-12.csv").map(lambda y:y.replace('"','')).map(lambda x:x.split(','))
dat.take(5) # first n rows

[['date',
  'time',
  'size',
  'r_version',
  'r_arch',
  'r_os',
  'package',
  'version',
  'country',
  'ip_id'],
 ['2015-12-12',
  '13:42:10',
  '257886',
  '3.2.2',
  'i386',
  'mingw32',
  'HistData',
  '0.7-6',
  'CZ',
  '1'],
 ['2015-12-12',
  '13:24:37',
  '1236751',
  '3.2.2',
  'x86_64',
  'mingw32',
  'RJSONIO',
  '1.3-0',
  'DE',
  '2'],
 ['2015-12-12',
  '13:42:35',
  '2077876',
  '3.2.2',
  'i386',
  'mingw32',
  'UsingR',
  '2.0-5',
  'CZ',
  '1'],
 ['2015-12-12',
  '13:42:01',
  '266724',
  '3.2.2',
  'i386',
  'mingw32',
  'gridExtra',
  '2.0.0',
  'CZ',
  '1']]

In [31]:
dat.filter(lambda x:x[0]=='date').collect()

[['date',
  'time',
  'size',
  'r_version',
  'r_arch',
  'r_os',
  'package',
  'version',
  'country',
  'ip_id']]

In [34]:
# creating data frame from rdd where column names are one of the element of rdd
DT2 = spark.createDataFrame(dat.filter(lambda x:x[0]!='date'),dat.filter(lambda x:x[0]=='date').collect()[0])
# for rdd to be part of data frame contents,select all elements (rdd element) from rdd whose 1st element is NOT equal to string 'data'.
# for rdd to be part of data frame schema (list of column names),select all elements from rdd whose 1st element is equal to string 'data'.
#.collect()[0] because 1st element of rdd(ie column names list) is already a list,so to select list value from list
DT2.show(5)

+----------+--------+-------+---------+------+-------+---------+-------+-------+-----+
|      date|    time|   size|r_version|r_arch|   r_os|  package|version|country|ip_id|
+----------+--------+-------+---------+------+-------+---------+-------+-------+-----+
|2015-12-12|13:42:10| 257886|    3.2.2|  i386|mingw32| HistData|  0.7-6|     CZ|    1|
|2015-12-12|13:24:37|1236751|    3.2.2|x86_64|mingw32|  RJSONIO|  1.3-0|     DE|    2|
|2015-12-12|13:42:35|2077876|    3.2.2|  i386|mingw32|   UsingR|  2.0-5|     CZ|    1|
|2015-12-12|13:42:01| 266724|    3.2.2|  i386|mingw32|gridExtra|  2.0.0|     CZ|    1|
|2015-12-12|13:00:21|3687766|       NA|    NA|     NA|     lme4| 1.1-10|     DE|    3|
+----------+--------+-------+---------+------+-------+---------+-------+-------+-----+
only showing top 5 rows



In [35]:
DT2.printSchema()

root
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- size: string (nullable = true)
 |-- r_version: string (nullable = true)
 |-- r_arch: string (nullable = true)
 |-- r_os: string (nullable = true)
 |-- package: string (nullable = true)
 |-- version: string (nullable = true)
 |-- country: string (nullable = true)
 |-- ip_id: string (nullable = true)



In [39]:
DT3 = DT2.withColumn("size", DT2["size"].cast(IntegerType())) #withColumn() to change datatype of column
DT3 = DT3.withColumn("date", DT3["date"].cast(DateType()))

In [40]:
DT3.dtypes

[('date', 'date'),
 ('time', 'string'),
 ('size', 'int'),
 ('r_version', 'string'),
 ('r_arch', 'string'),
 ('r_os', 'string'),
 ('package', 'string'),
 ('version', 'string'),
 ('country', 'string'),
 ('ip_id', 'string')]

In [43]:
DT4 = DT2.withColumnRenamed("size", "size_new")
DT4.show(5)

+----------+--------+--------+---------+------+-------+---------+-------+-------+-----+
|      date|    time|size_new|r_version|r_arch|   r_os|  package|version|country|ip_id|
+----------+--------+--------+---------+------+-------+---------+-------+-------+-----+
|2015-12-12|13:42:10|  257886|    3.2.2|  i386|mingw32| HistData|  0.7-6|     CZ|    1|
|2015-12-12|13:24:37| 1236751|    3.2.2|x86_64|mingw32|  RJSONIO|  1.3-0|     DE|    2|
|2015-12-12|13:42:35| 2077876|    3.2.2|  i386|mingw32|   UsingR|  2.0-5|     CZ|    1|
|2015-12-12|13:42:01|  266724|    3.2.2|  i386|mingw32|gridExtra|  2.0.0|     CZ|    1|
|2015-12-12|13:00:21| 3687766|       NA|    NA|     NA|     lme4| 1.1-10|     DE|    3|
+----------+--------+--------+---------+------+-------+---------+-------+-------+-----+
only showing top 5 rows



In [53]:
DT4.sort(DT4.size_new).show(10) #using sort()

#DT4.sort(DT4.size_new.asc()).show(10)

+----------+--------+--------+---------+------+---------+------------+-------+-------+-----+
|      date|    time|size_new|r_version|r_arch|     r_os|     package|version|country|ip_id|
+----------+--------+--------+---------+------+---------+------------+-------+-------+-----+
|2015-12-12|23:52:09|  100004|       NA|    NA|       NA|ConnMatTools|  0.1.5|     CN| 4571|
|2015-12-12|15:17:32| 1000127|    3.2.3|x86_64|linux-gnu|       SMVar|  1.3.3|     KR| 4986|
|2015-12-12|16:32:35| 1000127|    3.1.3|  i386|  mingw32|       SMVar|  1.3.3|     AE|  556|
|2015-12-12|04:54:48| 1000127|    3.1.0|x86_64|  mingw32|       SMVar|  1.3.3|     US| 1652|
|2015-12-12|20:35:23| 1000127|    3.2.3|x86_64|linux-gnu|       SMVar|  1.3.3|     US| 4438|
|2015-12-12|14:14:20| 1000127|    3.2.2|x86_64|  mingw32|       SMVar|  1.3.3|     KR|  511|
|2015-12-12|20:33:54| 1000127|    3.2.3|x86_64|linux-gnu|       SMVar|  1.3.3|     CN|   41|
|2015-12-12|18:21:19| 1000127|    3.2.2|x86_64|  mingw32|       SMVar|

In [55]:
DT4.filter(DT4['size_new'] <1000).count() / DT4.count() #filter() for dataframe

0.11161009458040756

In [56]:
DT3.filter(DT3['package'] == "ggplot2").count() / DT3.count() 

0.009273193054466087

In [57]:
DT3.groupBy("package").count().show(5) #creates a groupBy column and a aggregation function col by func name

+----------+-----+
|   package|count|
+----------+-----+
|   TH.data|  532|
|     sharx|   20|
|   spssDDI|    8|
|xpose4data|    5|
|websockets|    3|
+----------+-----+
only showing top 5 rows



In [58]:
package_count = DT3.groupBy("package").count().sort("count", ascending = False)
package_count.show(5)

+-------+-----+
|package|count|
+-------+-----+
|   Rcpp| 4783|
|ggplot2| 3913|
|stringi| 3748|
|stringr| 3449|
|   plyr| 3436|
+-------+-----+
only showing top 5 rows



In [70]:
package_count.dtypes

[('package', 'string'), ('count', 'bigint')]

In [75]:
DT3.count()

421969

In [103]:
package_count.filter(package_count.package == 'DT').show() #show package of type 'DT' and its count

+-------+-----+-----+
|package|count| perc|
+-------+-----+-----+
|     DT|   97|0.023|
+-------+-----+-----+



In [86]:
def derive_perc(x):
     return str(round(x * 100,3))+'%'

In [104]:
#derive_perc = udf(lambda x: str(round(x * 100, 3)) + "%")
# or 
#@udf
#def derive_perc(x):
     #return(str(round(x * 100, 3)) + "%")

#package_count = package_count.withColumn("perc", derive_perc(package_count['count'] / DT3.count()))

#package_count.show(10) #not working for unknown reason

In [139]:
   
package_count = package_count.withColumn('perc', round(package_count['count']*100 / DT3.count(),3))

package_count.show(10)

+------------+-----+-----+
|     package|count| perc|
+------------+-----+-----+
|        Rcpp| 4783|1.133|
|     ggplot2| 3913|0.927|
|     stringi| 3748|0.888|
|     stringr| 3449|0.817|
|        plyr| 3436|0.814|
|    magrittr| 3265|0.774|
|      digest| 3223|0.764|
|    reshape2| 3205| 0.76|
|RColorBrewer| 3046|0.722|
|      scales| 3007|0.713|
+------------+-----+-----+
only showing top 10 rows



#We can build a view with the Spark DataFrame, then we can SQL syntax to further process our data.

#You may notice there are two ways to build a view,

#DF.createGlobalTempView (or DF.createOrReplaceGlobalTempView): create a global temporary view
#DF.createTempView (or DF.createOrReplaceTempView): create a local temporary view

#The main difference between them is the lifetime. The lifetime of a global temporary view is tied to the Spark application, while lifetime of a local temporary view is tied to the SparkSession.

In [131]:
package_count.createOrReplaceTempView("package_count_sql_table")

In [132]:
spark.sql("select perc\
                               from package_count_sql_table \
                               where package = 'DT'").show()

+-----+
| perc|
+-----+
|0.023|
+-----+



In [133]:
qs=spark.sql("select * \
                                from package_count_sql_table \
                                where count > 1000 \
                                order by count desc")
qs.show(5)

+-------+-----+-----+
|package|count| perc|
+-------+-----+-----+
|   Rcpp| 4783|1.133|
|ggplot2| 3913|0.927|
|stringi| 3748|0.888|
|stringr| 3449|0.817|
|   plyr| 3436|0.814|
+-------+-----+-----+
only showing top 5 rows



In [134]:
print(qs.show(5)) # same as show()

+-------+-----+-----+
|package|count| perc|
+-------+-----+-----+
|   Rcpp| 4783|1.133|
|ggplot2| 3913|0.927|
|stringi| 3748|0.888|
|stringr| 3449|0.817|
|   plyr| 3436|0.814|
+-------+-----+-----+
only showing top 5 rows

None


In [136]:
# Use the Spark RDD way to process the results from Spark SQL query result

qs.rdd.map(lambda x:x.package + ":" + str(x.perc)).take(10) # qs.rdd.map(lambda x:x['package'] + ":" + str(x['perc'])).take(10)

['Rcpp:1.133',
 'ggplot2:0.927',
 'stringi:0.888',
 'stringr:0.817',
 'plyr:0.814',
 'magrittr:0.774',
 'digest:0.764',
 'reshape2:0.76',
 'RColorBrewer:0.722',
 'scales:0.713']