In [1]:
sc

<pyspark.context.SparkContext at 0x7f8ccc526890>

In [2]:
from pyspark.sql.types import Row
from datetime import datetime
#row is spark object use to represent single record in dataframe

###### convert python list into RDD

In [3]:
simple_data = sc.parallelize([1,'abc'])
simple_data

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:484

In [4]:
simple_data.first()#action

1

In [5]:
simple_data.collect()#action

[1, 'abc']

In [6]:
simple_data.take(2)#action

[1, 'abc']

df = simple_data.toDF()
#it give error bcoz simple_data has entity of different type

In [10]:
data = sc.parallelize([[1,'abc'],[2,'xyz']])

In [11]:
data

ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:484

In [12]:
data.collect()

[[1, 'abc'], [2, 'xyz']]

In [13]:
df = data.toDF()

In [14]:
df

DataFrame[_1: bigint, _2: string]

In [15]:
df.collect()
#we can use RDD's Operations on dataframe

[Row(_1=1, _2=u'abc'), Row(_1=2, _2=u'xyz')]

In [16]:
df.first()
#we can use RDD's Operations on dataframe

Row(_1=1, _2=u'abc')

In [17]:
df.take(1)
#we can use RDD's Operations on dataframe

[Row(_1=1, _2=u'abc')]

In [19]:
df.show()
#show() method return result in tabular format and column name is generated automatically

+---+---+
| _1| _2|
+---+---+
|  1|abc|
|  2|xyz|
+---+---+



###### If we want to create create dataframe with column name then we can use Row() object to give name to column in dataframe

In [25]:
record = sc.parallelize([Row(id=1,name='abc'),
                         Row(id=2,name='xyz')])

In [26]:
record

ParallelCollectionRDD[22] at parallelize at PythonRDD.scala:484

In [27]:
record.collect()

[Row(id=1, name='abc'), Row(id=2, name='xyz')]

In [28]:
df = record.toDF()

In [29]:
df.show()

+---+----+
| id|name|
+---+----+
|  1| abc|
|  2| xyz|
+---+----+



#### Complex Data Types

In [30]:
record = sc.parallelize([Row(
                                col_list = [1,2,3],
                                col_dict = {"k1":0},
                                col_row = Row(1,2,3),
                                col_time = datetime(2014,11,8)),
                        Row(
                                col_list = [4,5,6],
                                col_dict = {"k2":0},
                                col_row = Row(4,5,6),
                                col_time = datetime(2014,11,8))
                        
                        
                        ])

In [31]:
df = record.toDF()
df.show()

+------------+---------+-------+--------------------+
|    col_dict| col_list|col_row|            col_time|
+------------+---------+-------+--------------------+
|Map(k1 -> 0)|[1, 2, 3]|[1,2,3]|2014-11-08 00:00:...|
|Map(k2 -> 0)|[4, 5, 6]|[4,5,6]|2014-11-08 00:00:...|
+------------+---------+-------+--------------------+



#### SQLCONTEXT

In [32]:
sqlcontext = SQLContext(sc)
#it take sc object as input parameter
#it is wrapper around sparkContext
#provide additional SQL Functionality

In [33]:
sqlcontext

<pyspark.sql.context.SQLContext at 0x7ffb41fbddd0>

In [34]:
data = sqlcontext.range(5)
#range function of SQLCONTEXT create dataframe of one column id with 5 values in it from 0-4

In [35]:
data.show()

+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
+---+



In [36]:
data = [('abc',1),
        ('xyz',2)
       ]

In [37]:
data

[('abc', 1), ('xyz', 2)]

In [38]:
sqlcontext.createDataFrame(data).show()
#it allow us to create dataframe directly from input data i.e data
#instead of create RDD from Data then Create DataFrame from That RDD
#here name of column is explicitly created 

+---+---+
| _1| _2|
+---+---+
|abc|  1|
|xyz|  2|
+---+---+



In [40]:
sqlcontext.createDataFrame(data,['NAME','Id']).show()
#here we give input data as well as name of column name



+----+---+
|NAME| Id|
+----+---+
| abc|  1|
| xyz|  2|
+----+---+



###### All operations Together

In [62]:
record = sc.parallelize([Row('[1,2,3]',
                             '{"k1":0}',
                             'Row(1,2,3)',
                             'datetime(2014,11,8))')])

record = sc.parallelize([Row(1,2,3,4),
                         Row(5,6,7,8)])

In [63]:
columns_name = Row('col_list','col_dict','col_row','col_time')

In [64]:
data = record.map(lambda x : columns_name(*x))

In [65]:
data

PythonRDD[65] at RDD at PythonRDD.scala:49

In [66]:
data.collect()

[Row(col_list='[1,2,3]', col_dict='{"k1":0}', col_row='Row(1,2,3)', col_time='datetime(2014,11,8))')]

###### convert DataFrame to RDD

In [68]:
data

PythonRDD[65] at RDD at PythonRDD.scala:49

In [69]:
df.rdd.map(lambda x : x.col_dict).collect()

[{u'k1': 0}, {u'k2': 0}]

In [70]:
df.select('col_dict').show()

+------------+
|    col_dict|
+------------+
|Map(k1 -> 0)|
|Map(k2 -> 0)|
+------------+



###### DataFrame is not support map operation so to do opertaion on it we can use "withColumn"

In [71]:
df.show()

+------------+---------+-------+--------------------+
|    col_dict| col_list|col_row|            col_time|
+------------+---------+-------+--------------------+
|Map(k1 -> 0)|[1, 2, 3]|[1,2,3]|2014-11-08 00:00:...|
|Map(k2 -> 0)|[4, 5, 6]|[4,5,6]|2014-11-08 00:00:...|
+------------+---------+-------+--------------------+



In [74]:
df.select('col_list').withColumn("col",df.col_list[0]+2).show()

+---------+---+
| col_list|col|
+---------+---+
|[1, 2, 3]|  3|
|[4, 5, 6]|  6|
+---------+---+



In [138]:
df.withColumn("col",df.col_list[0]+2).show()

+------------+---------+-------+--------------------+---+
|    col_dict| col_list|col_row|            col_time|col|
+------------+---------+-------+--------------------+---+
|Map(k1 -> 0)|[1, 2, 3]|[1,2,3]|2014-11-08 00:00:...|  3|
|Map(k2 -> 0)|[4, 5, 6]|[4,5,6]|2014-11-08 00:00:...|  6|
+------------+---------+-------+--------------------+---+



In [75]:
df.map(lambda x : x).show()

AttributeError: 'DataFrame' object has no attribute 'map'

In [76]:
df.withColumnRenamed('col_list','list').show()
#new name assign and new data frame is created

+------------+---------+-------+--------------------+
|    col_dict|     list|col_row|            col_time|
+------------+---------+-------+--------------------+
|Map(k1 -> 0)|[1, 2, 3]|[1,2,3]|2014-11-08 00:00:...|
|Map(k2 -> 0)|[4, 5, 6]|[4,5,6]|2014-11-08 00:00:...|
+------------+---------+-------+--------------------+



In [78]:
df.select('col_list',df.col_list.alias('list')).show()
#alias method is use to give name to column

+---------+---------+
| col_list|     list|
+---------+---------+
|[1, 2, 3]|[1, 2, 3]|
|[4, 5, 6]|[4, 5, 6]|
+---------+---------+



##### We can convert Spark DataFrame to Pandas DataFrame

In [79]:
import pandas

In [80]:
df

DataFrame[col_dict: map<string,bigint>, col_list: array<bigint>, col_row: struct<_1:bigint,_2:bigint,_3:bigint>, col_time: timestamp]

In [81]:
df_pandas = df.toPandas()

In [82]:
df_pandas
#Pandas Dataframe are in-memory on single machine

Unnamed: 0,col_dict,col_list,col_row,col_time
0,{u'k1': 0},"[1, 2, 3]","(1, 2, 3)",2014-11-08
1,{u'k2': 0},"[4, 5, 6]","(4, 5, 6)",2014-11-08


###### convert Pandas DataFrame to Spark DataFrame

In [84]:
df_spark = sqlcontext.createDataFrame(df_pandas).show()
#this Soark dataFrame is spread across multiple data node of Spark Cluster

+------------+---------+-------+-------------------+
|    col_dict| col_list|col_row|           col_time|
+------------+---------+-------+-------------------+
|Map(k1 -> 0)|[1, 2, 3]|[1,2,3]|1415404800000000000|
|Map(k2 -> 0)|[4, 5, 6]|[4,5,6]|1415404800000000000|
+------------+---------+-------+-------------------+



# SPARK 2


## SPARKSESSION

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Crime_Data').getOrCreate()

In [4]:
data = spark.read.format("csv")\
                .option("header","true")\
                .load("file:///home/icpl12900/Desktop/assignments/Spark_data/london_crime_by_lsoa.csv")
        
#.load read csv file
#.read allow u to read file

In [5]:
data.printSchema()

root
 |-- lsoa_code: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- major_category: string (nullable = true)
 |-- minor_category: string (nullable = true)
 |-- value: string (nullable = true)
 |-- year: string (nullable = true)
 |-- month: string (nullable = true)



In [6]:
data.show()

+---------+--------------------+--------------------+--------------------+-----+----+-----+
|lsoa_code|             borough|      major_category|      minor_category|value|year|month|
+---------+--------------------+--------------------+--------------------+-----+----+-----+
|E01001116|             Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
|E01001646|           Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
|E01000677|             Bromley|Violence Against ...|      Other violence|    0|2015|    5|
|E01003774|           Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|E01004563|          Wandsworth|             Robbery|   Personal Property|    0|2008|    6|
|E01001320|              Ealing|  Theft and Handling|         Other Theft|    0|2012|    5|
|E01001342|              Ealing|Violence Against ...|    Offensive Weapon|    0|2010|    7|
|E01002633|            Hounslow|             Robbery|   Personal Property|    0|

In [7]:
data.head(2)
#it return Result in form of RDD

[Row(lsoa_code=u'E01001116', borough=u'Croydon', major_category=u'Burglary', minor_category=u'Burglary in Other Buildings', value=u'0', year=u'2016', month=u'11'),
 Row(lsoa_code=u'E01001646', borough=u'Greenwich', major_category=u'Violence Against the Person', minor_category=u'Other violence', value=u'0', year=u'2016', month=u'11')]

In [8]:
data.limit(2).show()
#it return result in form of DataFrame 

+---------+---------+--------------------+--------------------+-----+----+-----+
|lsoa_code|  borough|      major_category|      minor_category|value|year|month|
+---------+---------+--------------------+--------------------+-----+----+-----+
|E01001116|  Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
|E01001646|Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
+---------+---------+--------------------+--------------------+-----+----+-----+



In [9]:
data.count()

13490604

In [10]:
data.dropna()

DataFrame[lsoa_code: string, borough: string, major_category: string, minor_category: string, value: string, year: string, month: string]

In [11]:
data.select('borough').distinct().show()

+--------------------+
|             borough|
+--------------------+
|             Croydon|
|          Wandsworth|
|              Bexley|
|             Lambeth|
|Barking and Dagenham|
|              Camden|
|           Greenwich|
|              Newham|
|       Tower Hamlets|
|            Hounslow|
|              Barnet|
|              Harrow|
|Kensington and Ch...|
|           Islington|
|               Brent|
|            Haringey|
|             Bromley|
|              Merton|
|         Westminster|
|             Hackney|
+--------------------+
only showing top 20 rows



In [12]:
data.select('borough').distinct().count()

33

In [13]:
data.filter(data['borough'] == 'Croydon').show(2)

+---------+-------+--------------+--------------------+-----+----+-----+
|lsoa_code|borough|major_category|      minor_category|value|year|month|
+---------+-------+--------------+--------------------+-----+----+-----+
|E01001116|Croydon|      Burglary|Burglary in Other...|    0|2016|   11|
|E01001029|Croydon|         Drugs| Possession Of Drugs|    0|2010|   12|
+---------+-------+--------------+--------------------+-----+----+-----+
only showing top 2 rows



In [14]:
data_2015_2016 = data.filter(data['year'].isin("2015","2016"))
data_2015_2016.sample(True,fraction=0.1).show()

+---------+----------+--------------------+--------------------+-----+----+-----+
|lsoa_code|   borough|      major_category|      minor_category|value|year|month|
+---------+----------+--------------------+--------------------+-----+----+-----+
|E01002634|  Hounslow|     Criminal Damage|Criminal Damage T...|    0|2015|    2|
|E01002360|  Havering|    Fraud or Forgery|  Counted per Victim|    0|2015|   11|
|E01002122|    Harrow|    Fraud or Forgery|Other Fraud & For...|    0|2015|    2|
|E01002804| Islington|  Theft and Handling|Theft/Taking Of M...|    0|2016|    6|
|E01001807|   Hackney|  Theft and Handling|  Other Theft Person|    0|2016|    8|
|E01003625|    Newham|Violence Against ...|      Other violence|    0|2015|    3|
|E01002231|    Harrow|               Drugs| Possession Of Drugs|    0|2016|   11|
|E01000829|   Bromley|     Criminal Damage|Criminal Damage T...|    0|2015|    9|
|E01032562|   Bromley|     Criminal Damage|Other Criminal Da...|    0|2015|   12|
|E01033739| Gree

In [15]:
data_2015_2016 = data.filter(data['year'].isin("2015","2016"))
data_2015_2016.sample(False,fraction=0.1).show()

+---------+--------------+--------------------+--------------------+-----+----+-----+
|lsoa_code|       borough|      major_category|      minor_category|value|year|month|
+---------+--------------+--------------------+--------------------+-----+----+-----+
|E01004346|Waltham Forest|  Theft and Handling|Motor Vehicle Int...|    0|2016|    3|
|E01001807|       Hackney|  Theft and Handling|  Other Theft Person|    0|2016|    8|
|E01000486|         Brent|               Drugs| Possession Of Drugs|    0|2015|   11|
|E01000353|        Bexley|Violence Against ...| Assault with Injury|    0|2015|    6|
|E01000829|       Bromley|     Criminal Damage|Criminal Damage T...|    0|2015|    9|
|E01002367|      Havering|Other Notifiable ...|    Other Notifiable|    0|2016|    6|
|E01002426|    Hillingdon|Violence Against ...|        Wounding/GBH|    0|2016|    9|
|E01001208|        Ealing|  Theft and Handling|  Other Theft Person|    0|2015|    8|
|E01001846|       Hackney|Violence Against ...|       

In [16]:
data_2015_2016 = data.filter(data['year'].isin("2015","2016")).show()

+---------+--------------------+--------------------+--------------------+-----+----+-----+
|lsoa_code|             borough|      major_category|      minor_category|value|year|month|
+---------+--------------------+--------------------+--------------------+-----+----+-----+
|E01001116|             Croydon|            Burglary|Burglary in Other...|    0|2016|   11|
|E01001646|           Greenwich|Violence Against ...|      Other violence|    0|2016|   11|
|E01000677|             Bromley|Violence Against ...|      Other violence|    0|2015|    5|
|E01003774|           Redbridge|            Burglary|Burglary in Other...|    0|2016|    3|
|E01004177|              Sutton|  Theft and Handling|Theft/Taking of P...|    1|2016|    8|
|E01003076|             Lambeth|Violence Against ...|      Other violence|    0|2015|    4|
|E01002398|          Hillingdon|  Theft and Handling|Theft/Taking Of M...|    0|2016|    2|
|E01002945|Kingston upon Thames|  Theft and Handling|    Theft From Shops|    0|

##### Spark sampling functions allows to take different samples following distributions or only take a couple of them. In Spark, there are two sampling operations, the transformation sample and the action takeSample. By using a transformation we can tell Spark to apply successive transformation on a sample of a given RDD. By using an action we retrieve a given sample and we can have it in local memory to be used by any other standard library.



### Groupby and AGG
###### In RDD we can use "GROUPBY" only on "PAIR RDD" and groupby with key only

In [17]:
group_month = data.filter(data['year']=='2014')\
                    .groupBy('month')\
                    .agg({"value":"sum"})\
                    .withColumnRenamed("sum(value)","monthly")

In [18]:
group_month.show(2)

+-----+-------+
|month|monthly|
+-----+-------+
|    7|58564.0|
|   11|59704.0|
+-----+-------+
only showing top 2 rows



In [19]:
total = group_month.agg({"monthly":"sum"})

In [20]:
total.show()

+------------+
|sum(monthly)|
+------------+
|    680183.0|
+------------+



In [21]:
total_sum = total.collect()[0][0]

In [22]:
total_sum

680183.0

In [23]:
import pyspark.sql.functions as func

In [24]:
percentage = group_month.select(['month','monthly']).withColumn('month_"percent',func.round(((group_month.monthly / total_sum)*100) ,2))
#new column have name monthly which is created with "withColumn"
#if we give same name of column which is in dataframe in "withColumn" then it replace old column with new column value

In [25]:
percentage.printSchema()

root
 |-- month: string (nullable = true)
 |-- monthly: double (nullable = true)
 |-- month_"percent: double (nullable = true)



In [26]:
percentage.show(2)

+-----+-------+--------------+
|month|monthly|month_"percent|
+-----+-------+--------------+
|    7|58564.0|          8.61|
|   11|59704.0|          8.78|
+-----+-------+--------------+
only showing top 2 rows



In [27]:
data.agg({"month":"sum"}).show()

+-----------+
| sum(month)|
+-----------+
|8.7688926E7|
+-----------+



In [28]:
data.orderBy(data['month'].desc()).show()

+---------+--------------------+--------------------+--------------------+-----+----+-----+
|lsoa_code|             borough|      major_category|      minor_category|value|year|month|
+---------+--------------------+--------------------+--------------------+-----+----+-----+
|E01000389|              Bexley|               Drugs|    Drug Trafficking|    0|2009|    9|
|E01002050|            Haringey|    Fraud or Forgery|Other Fraud & For...|    0|2016|    9|
|E01002191|              Harrow|     Criminal Damage|Criminal Damage T...|    0|2011|    9|
|E01000194|              Barnet|Violence Against ...|    Offensive Weapon|    0|2009|    9|
|E01001423|             Enfield|Violence Against ...|          Harassment|    0|2012|    9|
|E01000491|               Brent|  Theft and Handling|Handling Stolen G...|    0|2012|    9|
|E01002701|           Islington|  Theft and Handling|Theft From Motor ...|    0|2010|    9|
|E01003887|Richmond upon Thames|               Drugs| Possession Of Drugs|    0|

In [33]:
data.crosstab('borough','major_category').select('Drugs').show()
#borough is Y-AXIS and major_category is X-AXIS

+-----+
|Drugs|
+-----+
|32616|
|29160|
|35424|
|37368|
|42336|
|44064|
|22140|
|32616|
|23004|
|43740|
|46980|
|26244|
|36504|
|34128|
|38772|
|41580|
|  756|
|26784|
|45144|
|31212|
+-----+
only showing top 20 rows



### UDF

In [3]:
players =  spark.read.format("csv")\
                .option("header","true")\
                .load("file:///home/icpl12900/Desktop/assignments/Spark_data/player.csv")
        

In [4]:
player_attribute = spark.read.format("csv")\
                .option("header","true")\
                .load("file:///home/icpl12900/Desktop/assignments/Spark_data/player_attributes.csv")

In [5]:
player_attribute.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- overall_rating: string (nullable = true)
 |-- potential: string (nullable = true)
 |-- preferred_foot: string (nullable = true)
 |-- attacking_work_rate: string (nullable = true)
 |-- defensive_work_rate: string (nullable = true)
 |-- crossing: string (nullable = true)
 |-- finishing: string (nullable = true)
 |-- heading_accuracy: string (nullable = true)
 |-- short_passing: string (nullable = true)
 |-- volleys: string (nullable = true)
 |-- dribbling: string (nullable = true)
 |-- curve: string (nullable = true)
 |-- free_kick_accuracy: string (nullable = true)
 |-- long_passing: string (nullable = true)
 |-- ball_control: string (nullable = true)
 |-- acceleration: string (nullable = true)
 |-- sprint_speed: string (nullable = true)
 |-- agility: string (nullable = true)
 |-- reactions: string (nullable = true

In [6]:
players.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- height: string (nullable = true)
 |-- weight: string (nullable = true)



In [7]:
from pyspark.sql.functions import udf

In [8]:
udf

<function pyspark.sql.functions.udf>

In [9]:
year_extract_udf = udf(lambda date : date.split('-')[0])

In [10]:
new_player_attribute = player_attribute.withColumn('year' , year_extract_udf(player_attribute.date))

In [11]:
new_player_attribute.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- overall_rating: string (nullable = true)
 |-- potential: string (nullable = true)
 |-- preferred_foot: string (nullable = true)
 |-- attacking_work_rate: string (nullable = true)
 |-- defensive_work_rate: string (nullable = true)
 |-- crossing: string (nullable = true)
 |-- finishing: string (nullable = true)
 |-- heading_accuracy: string (nullable = true)
 |-- short_passing: string (nullable = true)
 |-- volleys: string (nullable = true)
 |-- dribbling: string (nullable = true)
 |-- curve: string (nullable = true)
 |-- free_kick_accuracy: string (nullable = true)
 |-- long_passing: string (nullable = true)
 |-- ball_control: string (nullable = true)
 |-- acceleration: string (nullable = true)
 |-- sprint_speed: string (nullable = true)
 |-- agility: string (nullable = true)
 |-- reactions: string (nullable = true

In [12]:
pa_2016 = new_player_attribute.filter(new_player_attribute.year == 2016)

In [13]:
pa_2016.show(2)

+---+------------------+-------------+-------------------+--------------+---------+--------------+-------------------+-------------------+--------+---------+----------------+-------------+-------+---------+-----+------------------+------------+------------+------------+------------+-------+---------+-------+----------+-------+-------+--------+----------+----------+-------------+-----------+------+---------+-------+---------------+--------------+---------+-----------+----------+--------------+-----------+----+
| id|player_fifa_api_id|player_api_id|               date|overall_rating|potential|preferred_foot|attacking_work_rate|defensive_work_rate|crossing|finishing|heading_accuracy|short_passing|volleys|dribbling|curve|free_kick_accuracy|long_passing|ball_control|acceleration|sprint_speed|agility|reactions|balance|shot_power|jumping|stamina|strength|long_shots|aggression|interceptions|positioning|vision|penalties|marking|standing_tackle|sliding_tackle|gk_diving|gk_handling|gk_kicking|gk_

In [14]:
pa_striker_2016 = pa_2016.groupBy('player_api_id').\
                        agg({"finishing" : 'avg',
                            'shot_power':'avg',
                            'acceleration':'avg'})

In [15]:
pa_striker_2016.printSchema()

root
 |-- player_api_id: string (nullable = true)
 |-- avg(finishing): double (nullable = true)
 |-- avg(acceleration): double (nullable = true)
 |-- avg(shot_power): double (nullable = true)



In [16]:
pa_striker_2016=pa_striker_2016.withColumnRenamed('avg(finishing)','finishing')\
              .withColumnRenamed('avg(acceleration)','acceleration')\
              .withColumnRenamed('avg(shot_power)','shot_power')

In [17]:
pa_striker_2016.printSchema()

root
 |-- player_api_id: string (nullable = true)
 |-- finishing: double (nullable = true)
 |-- acceleration: double (nullable = true)
 |-- shot_power: double (nullable = true)



In [18]:
finishing = 1
acceleration = 2
shot_power = 1
total = finishing + acceleration + shot_power

In [66]:
total

4

In [67]:
striker = pa_striker_2016.withColumn("striker_grade" , (pa_striker_2016.finishing * finishing +\
                                                       pa_striker_2016.acceleration * acceleration +\
                                                       pa_striker_2016.shot_power * shot_power)/total)

In [68]:
striker.printSchema()

root
 |-- player_api_id: string (nullable = true)
 |-- finishing: double (nullable = true)
 |-- acceleration: double (nullable = true)
 |-- shot_power: double (nullable = true)
 |-- striker_grade: double (nullable = true)



In [69]:
striker = striker.sort(striker.striker_grade.desc())

In [70]:
striker.show(5)

+-------------+---------+------------+----------+-------------+
|player_api_id|finishing|acceleration|shot_power|striker_grade|
+-------------+---------+------------+----------+-------------+
|        37412|     90.0|        92.0|      87.0|        90.25|
|        38817|     88.0|        90.0|      88.5|       89.125|
|       150565|     88.0|        95.0|      78.0|         89.0|
|        31921|     81.0|        93.0|      87.0|         88.5|
|        30834|     85.0|        90.0|      86.0|        87.75|
+-------------+---------+------------+----------+-------------+
only showing top 5 rows



In [71]:
striker=striker.drop('finishing','acceleration','shot_power')

In [72]:
striker.show(2)

+-------------+-------------+
|player_api_id|striker_grade|
+-------------+-------------+
|        37412|        90.25|
|        38817|       89.125|
+-------------+-------------+
only showing top 2 rows



In [73]:
players.printSchema()

root
 |-- id: string (nullable = true)
 |-- player_api_id: string (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- height: string (nullable = true)
 |-- weight: string (nullable = true)



In [74]:
best_striker = striker.join(players,players.player_api_id == striker.player_api_id )

In [75]:
best_striker.show(2)

+-------------+-------------+----+-------------+-------------+------------------+-------------------+------+------+
|player_api_id|striker_grade|  id|player_api_id|  player_name|player_fifa_api_id|           birthday|height|weight|
+-------------+-------------+----+-------------+-------------+------------------+-------------------+------+------+
|        37412|        90.25|9674|        37412|Sergio Aguero|            153079|1988-06-02 00:00:00|172.72|   163|
|        38817|       89.125|1581|        38817| Carlos Tevez|            143001|1984-02-05 00:00:00|172.72|   157|
+-------------+-------------+----+-------------+-------------+------------------+-------------------+------+------+
only showing top 2 rows



In [76]:
best_striker = striker.join(players,['player_api_id'] )

In [77]:
best_striker.show(2)

+-------------+-------------+----+-------------+------------------+-------------------+------+------+
|player_api_id|striker_grade|  id|  player_name|player_fifa_api_id|           birthday|height|weight|
+-------------+-------------+----+-------------+------------------+-------------------+------+------+
|        37412|        90.25|9674|Sergio Aguero|            153079|1988-06-02 00:00:00|172.72|   163|
|        38817|       89.125|1581| Carlos Tevez|            143001|1984-02-05 00:00:00|172.72|   157|
+-------------+-------------+----+-------------+------------------+-------------------+------+------+
only showing top 2 rows



### BROADCAST VARIABLES

WHEN DATASETS IS HUGE THEN JOIN OPERATIONS ON THAT DATASET IS VERY HEAVY OPERATION. BCOZ DATASETS ARE DISTRIBUTE ACROSS NODES IN CLUSTER. SO USE BROADCAST IN WHICH ONE DATASET WHICH IS SMALL IS COPY TO WORKER NODE AND JOIN PRFORM OVER THERE.

In [79]:
from pyspark.sql.functions import broadcast

In [80]:
striker_details = players.select('player_api_id' ,'player_name').\
                            join(
                                    broadcast(striker),
                                    ['player_api_id'],
                                    'inner' )

In [81]:
striker_details.show(2)

+-------------+------------------+-----------------+
|player_api_id|       player_name|    striker_grade|
+-------------+------------------+-----------------+
|       505942|Aaron Appindangoye|            54.75|
|       155782|   Aaron Cresswell|70.41666666666666|
+-------------+------------------+-----------------+
only showing top 2 rows



### ACCUMULATOR

#### HIGH HEADING ACCURACY DIVIDED BY BUCKET

In [127]:
players_heading_acc = player_attribute.select('player_api_id','heading_accuracy').\
                                       join(
                                            broadcast(players),
                                                 ['player_api_id'],
                                                'inner' 
                                            )

In [107]:
players_heading_acc.printSchema()

root
 |-- player_api_id: string (nullable = true)
 |-- heading_accuracy: string (nullable = true)
 |-- id: string (nullable = true)
 |-- player_name: string (nullable = true)
 |-- player_fifa_api_id: string (nullable = true)
 |-- birthday: string (nullable = true)
 |-- height: string (nullable = true)
 |-- weight: string (nullable = true)



###### HEIGHT

In [84]:
short = spark.sparkContext.accumulator(0)
medium = spark.sparkContext.accumulator(0)
tall = spark.sparkContext.accumulator(0)

In [89]:
def count_player_height(row):
    height = float(row.height)
    if (height<=175):
        short.add(1)
    elif (height <= 195):
        medium.add(1)
    else:
        tall.add(1)
    

In [90]:
players_heading_acc.foreach(lambda x : count_player_height(x))

In [91]:
all_players = [short.value,
              medium.value,
              tall.value]
all_players

[19204, 161369, 187383]

###### heading_accuracy

In [108]:
short_ha = spark.sparkContext.accumulator(0)
medium_ha = spark.sparkContext.accumulator(0)
tall_ha = spark.sparkContext.accumulator(0)

In [121]:
def count_player_headingAccuracy(row):
    head = row.heading_accuracy
    if(head>=75 and head<=175):
        short_ha.add(1)
    elif (head <= 195):
        medium_ha.add(1)
    else:
        tall_ha.add(1)

In [122]:
players_heading_acc.foreach(lambda x : count_player_headingAccuracy(x))

In [124]:
all_players_heading = [short_ha.value,
                      medium_ha.value,
                      tall_ha.value]
all_players_heading

[1672, 836, 549426]

In [126]:
percentage_value = [(short_ha.value/short.value) *100,
                      (medium_ha.value/medium.value) *100,
                      (tall_ha.value/tall.value) *100]

percentage_value

[0, 0, 200]

## STORE DATAFRAME TO FILE

In [128]:
pa_2016.columns

['id',
 'player_fifa_api_id',
 'player_api_id',
 'date',
 'overall_rating',
 'potential',
 'preferred_foot',
 'attacking_work_rate',
 'defensive_work_rate',
 'crossing',
 'finishing',
 'heading_accuracy',
 'short_passing',
 'volleys',
 'dribbling',
 'curve',
 'free_kick_accuracy',
 'long_passing',
 'ball_control',
 'acceleration',
 'sprint_speed',
 'agility',
 'reactions',
 'balance',
 'shot_power',
 'jumping',
 'stamina',
 'strength',
 'long_shots',
 'aggression',
 'interceptions',
 'positioning',
 'vision',
 'penalties',
 'marking',
 'standing_tackle',
 'sliding_tackle',
 'gk_diving',
 'gk_handling',
 'gk_kicking',
 'gk_positioning',
 'gk_reflexes',
 'year']

In [131]:
pa_2016.select( 'player_api_id','date')\
        .coalesce(1)\
        .write\
        .option("header","true")\
        .csv("file:///home/icpl12900/Desktop/assignments/Spark_data/player_date.csv")
        
#coalesce(1) --- to repartitioned DataFrame to single partition, to write into single file. here we want 
#one partition  so use 1 as argument
#player_date.csv it is directiory

## USE CUSTOM ACCUMULATOR

In [132]:
from pyspark.accumulators import AccumulatorParam

In [133]:
class VectorAccumulator(AccumulatorParam):
    def zero(self,value):
        return [0.0] * len(value)
    def addInPlace(self,v1,v2):
        for i in range(len(v1)):
            v1[i] += v2[i]
        return v1

In [134]:
vector_acc = sc.accumulator([10.0,20.0],VectorAccumulator())
vector_acc.value

[10.0, 20.0]

## BASIC SQL OPERATIONS ON SPARK

In [19]:
pa_2016

DataFrame[id: string, player_fifa_api_id: string, player_api_id: string, date: string, overall_rating: string, potential: string, preferred_foot: string, attacking_work_rate: string, defensive_work_rate: string, crossing: string, finishing: string, heading_accuracy: string, short_passing: string, volleys: string, dribbling: string, curve: string, free_kick_accuracy: string, long_passing: string, ball_control: string, acceleration: string, sprint_speed: string, agility: string, reactions: string, balance: string, shot_power: string, jumping: string, stamina: string, strength: string, long_shots: string, aggression: string, interceptions: string, positioning: string, vision: string, penalties: string, marking: string, standing_tackle: string, sliding_tackle: string, gk_diving: string, gk_handling: string, gk_kicking: string, gk_positioning: string, gk_reflexes: string, year: string]

In [20]:
pa_2016.createOrReplaceTempView('table_2016')


##### It create temporary view for dataFrame pa_2016 and it is temporary . It create only for this session. Once session is closed view also disappear
#### It Register DataFrame as Table For Per-Session

In [22]:
all_data = sqlContext.sql("select * from table_2016")

In [25]:
all_data.show(1)

+---+------------------+-------------+-------------------+--------------+---------+--------------+-------------------+-------------------+--------+---------+----------------+-------------+-------+---------+-----+------------------+------------+------------+------------+------------+-------+---------+-------+----------+-------+-------+--------+----------+----------+-------------+-----------+------+---------+-------+---------------+--------------+---------+-----------+----------+--------------+-----------+----+
| id|player_fifa_api_id|player_api_id|               date|overall_rating|potential|preferred_foot|attacking_work_rate|defensive_work_rate|crossing|finishing|heading_accuracy|short_passing|volleys|dribbling|curve|free_kick_accuracy|long_passing|ball_control|acceleration|sprint_speed|agility|reactions|balance|shot_power|jumping|stamina|strength|long_shots|aggression|interceptions|positioning|vision|penalties|marking|standing_tackle|sliding_tackle|gk_diving|gk_handling|gk_kicking|gk_

In [28]:
sqlContext.sql('select id from table_2016 where id == 1').show()

+---+
| id|
+---+
|  1|
+---+



##### To make avaliable Global table to all SPARK SESSION  use "createGlobalTempView"
#### And to access this global table use  "global_temp.tableName"

In [29]:
pa_2016.createGlobalTempView('table_2016')


In [31]:
sqlContext.sql("select * from global_temp.table_2016").show(1)

+---+------------------+-------------+-------------------+--------------+---------+--------------+-------------------+-------------------+--------+---------+----------------+-------------+-------+---------+-----+------------------+------------+------------+------------+------------+-------+---------+-------+----------+-------+-------+--------+----------+----------+-------------+-----------+------+---------+-------+---------------+--------------+---------+-----------+----------+--------------+-----------+----+
| id|player_fifa_api_id|player_api_id|               date|overall_rating|potential|preferred_foot|attacking_work_rate|defensive_work_rate|crossing|finishing|heading_accuracy|short_passing|volleys|dribbling|curve|free_kick_accuracy|long_passing|ball_control|acceleration|sprint_speed|agility|reactions|balance|shot_power|jumping|stamina|strength|long_shots|aggression|interceptions|positioning|vision|penalties|marking|standing_tackle|sliding_tackle|gk_diving|gk_handling|gk_kicking|gk_

# AirLine DATA

In [34]:
airlinesPath = 'file:///home/icpl12900/Desktop/assignments/Spark_data/airline.csv'
flightPath = 'file:///home/icpl12900/Desktop/assignments/Spark_data/flights.csv'
airportPath = 'file:///home/icpl12900/Desktop/assignments/Spark_data/airports.csv'


In [36]:
airline = spark.read.format("csv")\
                .option("header","true")\
                .load(airlinesPath)
        

In [37]:
airport = spark.read.format("csv")\
                .option("header","true")\
                .load(airportPath)
        

In [38]:
flight = spark.read.format("csv")\
                .option("header","true")\
                .load(flightPath)

In [39]:
flight

DataFrame[date: string, airlines: string, flight_number: string, origin: string, destination: string, departure: string, departure_delay: string, arrival: string, arrival_delay: string, air_time: string, distance: string]

In [40]:
flight.show(1)

+----------+--------+-------------+------+-----------+---------+---------------+-------+-------------+--------+--------+
|      date|airlines|flight_number|origin|destination|departure|departure_delay|arrival|arrival_delay|air_time|distance|
+----------+--------+-------------+------+-----------+---------+---------------+-------+-------------+--------+--------+
|2014-04-01|   19805|            1|   JFK|        LAX|     0854|          -6.00|   1217|         2.00|  355.00| 2475.00|
+----------+--------+-------------+------+-----------+---------+---------------+-------+-------------+--------+--------+
only showing top 1 row



In [41]:
airline.createOrReplaceTempView('airline')

In [42]:
airport.createOrReplaceTempView('airport')

In [43]:
flight.createOrReplaceTempView('flight')

In [45]:
airline.columns

['1', 'Private flight', '\\N', '-', 'N/A', '_c5', '_c6', 'Y']

In [46]:
airport.columns

['Code', 'Description']

In [47]:
flight.columns

['date',
 'airlines',
 'flight_number',
 'origin',
 'destination',
 'departure',
 'departure_delay',
 'arrival',
 'arrival_delay',
 'air_time',
 'distance']

In [50]:
sqlContext.sql("select airlines from flight").show(1)

+--------+
|airlines|
+--------+
|   19805|
+--------+
only showing top 1 row



###### Table is dataFrame so we can use SQL Query as well as DataFrame operations on it

#### We want to calculate Total distance for All Flights

In [48]:
total_distance = sqlContext.sql("select distance from flight")\
                            .agg({"distance":"sum"})\
                            .withColumnRenamed("sum(distance)","total_distance")

In [49]:
total_distance.show()

+--------------+
|total_distance|
+--------------+
|  3.79052917E8|
+--------------+



##### If we want to calculate Total Distance for Particular Airline 

In [55]:
total_distance_per_airline = sqlContext.sql("select airlines,distance from flight")\
                            .groupBy("airlines")\
                            .agg({"distance":"sum"})\
                            .withColumnRenamed("sum(distance)","total_distance")

In [56]:
total_distance_per_airline.show(1)

+--------+--------------+
|airlines|total_distance|
+--------+--------------+
|   19690|     3317412.0|
+--------+--------------+
only showing top 1 row



### Departure_delay

In [57]:
flight.columns

['date',
 'airlines',
 'flight_number',
 'origin',
 'destination',
 'departure',
 'departure_delay',
 'arrival',
 'arrival_delay',
 'air_time',
 'distance']

In [61]:
all_delay_2014 = sqlContext.sql('select date,airlines,departure_delay from flight where departure_delay >0 and year(date) = 2014')

In [62]:
all_delay_2014.show(1)

+----------+--------+---------------+
|      date|airlines|departure_delay|
+----------+--------+---------------+
|2014-04-01|   19805|          14.00|
+----------+--------+---------------+
only showing top 1 row



In [63]:
all_delay_2014

DataFrame[date: string, airlines: string, departure_delay: string]

In [68]:
all_delay_2014.createOrReplaceTempView('all_delay_2014_table')

In [73]:
all_delay_2014.orderBy(all_delay_2014.departure_delay.desc()).show(1)

+----------+--------+---------------+
|      date|airlines|departure_delay|
+----------+--------+---------------+
|2014-04-18|   20409|          99.00|
+----------+--------+---------------+
only showing top 1 row



In [76]:
total_delay_airlines = sqlContext.sql('select count(airlines) from all_delay_2014_table')

In [80]:
total_no_delay = total_delay_airlines.collect()[0][0]

In [89]:
total_no_delay

179015

In [81]:
total_airlines = sqlContext.sql('select count(airlines) from flight')

In [82]:
total_airlines

DataFrame[count(airlines): bigint]

In [83]:
total_airlines.show()

+---------------+
|count(airlines)|
+---------------+
|         476881|
+---------------+



In [84]:
total_flight = total_airlines.collect()[0][0]

In [88]:
total_flight

476881

#### Percentage of total delay flights

In [98]:
percentage = float(total_no_delay) / float(total_flight) * 100

In [99]:
percentage

37.53871510922012

##  Find Name of  Airline with Departure Delay 

In [100]:
flight.columns

['date',
 'airlines',
 'flight_number',
 'origin',
 'destination',
 'departure',
 'departure_delay',
 'arrival',
 'arrival_delay',
 'air_time',
 'distance']

In [101]:
delay_airline = sqlContext.sql('select airlines,departure_delay from flight')\
                        .groupBy('airlines')\
                        .agg({'departure_delay':'avg'})\
                        .withColumnRenamed('avg(departure_delay)','departure_delay')

In [103]:
delay_airline

DataFrame[airlines: string, departure_delay: double]

In [104]:
delay_airline.show(1)

+--------+-------------------+
|airlines|    departure_delay|
+--------+-------------------+
|   19690|-2.1981308411214955|
+--------+-------------------+
only showing top 1 row



In [108]:
delay_airline_order = delay_airline.orderBy(delay_airline.departure_delay.desc())

In [109]:
delay_airline_order.show()

+--------+-------------------+
|airlines|    departure_delay|
+--------+-------------------+
|   19393| 13.429567657134724|
|   20366| 12.296210112379818|
|   19977|  8.818392620527979|
|   20436|  8.716275167785234|
|   20409|   8.31110357194785|
|   20398|  7.372135487994157|
|   21171|  6.989682212133719|
|   19805|  6.733031255779545|
|   20304|   6.05227892794094|
|   19790|  5.597661140117859|
|   20437|  5.110621095185594|
|   20355| 3.9925874044242105|
|   19930|-0.6991515343747522|
|   19690|-2.1981308411214955|
+--------+-------------------+



## WINDOW FUNCTIONS

#### +----+----+----+----------+----------+

#### |col1|col2|rank|dense_rank|row_number|

#### +----+----+----+----------+----------+

#### |   a|  10|   1|         1|         1|

#### |   a|  10|   1|         1|         2|

#### |   a|  20|   3|         2|         3|

#### +----+----+----+----------+----------+


In [110]:
product = spark.read.format("csv")\
                .option("header","true")\
                .load('file:///home/icpl12900/Desktop/assignments/Spark_data/products.csv')

In [111]:
product

DataFrame[product: string, category: string, price: string]

In [112]:
import sys
from pyspark.sql.window import Window
import pyspark.sql.functions as func

In [125]:
window_product = Window.partitionBy(product['category'])\
                        .orderBy(product['price'].desc())

In [126]:
window_product

<pyspark.sql.window.WindowSpec at 0x7f8ccb125c50>

In [127]:
price_rank = (func.rank().over(window_product))

In [128]:
price_rank

Column<RANK() OVER (PARTITION BY category ORDER BY price DESC NULLS LAST UnspecifiedFrame)>

#### RANK()

In [129]:
product_rank = product.select(product['product'],product['category'],product['price'])\
                    .withColumn('rank',func.rank().over(window_product))

In [130]:
product_rank

DataFrame[product: string, category: string, price: string, rank: int]

In [131]:
product_rank.show()

+----------+--------+-----+----+
|   product|category|price|rank|
+----------+--------+-----+----+
|    iPhone|  Mobile|  999|   1|
|Samsung JX|  Mobile|  799|   2|
|Redmi Note|  Mobile|  399|   3|
|   OnePlus|  Mobile|  356|   4|
|        Mi|  Mobile|  299|   5|
|  Micromax|  Mobile|  249|   6|
|Samsung TX|  Tablet|  999|   1|
|      iPad|  Tablet|  789|   2|
|    Lenovo|  Tablet|  499|   3|
|        Xu|  Tablet|  267|   4|
+----------+--------+-----+----+



### ROWBETWEEN(-1,0)
##### Here we create window of current row and one before current row

In [142]:
window_product_row = Window.partitionBy(product['category'])\
                        .orderBy(product['price'].desc())\
                        .rowsBetween(-1,0)

In [143]:
window_product_row

<pyspark.sql.window.WindowSpec at 0x7f8ccb096810>

In [144]:
price_max = (func.max(product['price']).over(window_product_row))

In [145]:
price_max

Column<max(price) OVER (PARTITION BY category ORDER BY price DESC NULLS LAST ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)>

In [146]:
product.select(product['product'],product['category'],product['price'],price_max.alias("price_max")).show()
        

+----------+--------+-----+---------+
|   product|category|price|price_max|
+----------+--------+-----+---------+
|    iPhone|  Mobile|  999|      999|
|Samsung JX|  Mobile|  799|      999|
|Redmi Note|  Mobile|  399|      799|
|   OnePlus|  Mobile|  356|      399|
|        Mi|  Mobile|  299|      356|
|  Micromax|  Mobile|  249|      299|
|Samsung TX|  Tablet|  999|      999|
|      iPad|  Tablet|  789|      999|
|    Lenovo|  Tablet|  499|      789|
|        Xu|  Tablet|  267|      499|
+----------+--------+-----+---------+



#### rowsBetween(-1,1)
##### Here we select before , current, after

In [153]:
window_product_row = Window.partitionBy(product['category'])\
                        .orderBy(product['price'].desc())\
                        .rowsBetween(-1,1)

In [154]:
price_max = (func.max(product['price']).over(window_product_row))

In [155]:
product.select(product['product'],product['category'],product['price'],price_max.alias("price_max")).show()


+----------+--------+-----+---------+
|   product|category|price|price_max|
+----------+--------+-----+---------+
|    iPhone|  Mobile|  999|      999|
|Samsung JX|  Mobile|  799|      999|
|Redmi Note|  Mobile|  399|      799|
|   OnePlus|  Mobile|  356|      399|
|        Mi|  Mobile|  299|      356|
|  Micromax|  Mobile|  249|      299|
|Samsung TX|  Tablet|  999|      999|
|      iPad|  Tablet|  789|      999|
|    Lenovo|  Tablet|  499|      789|
|        Xu|  Tablet|  267|      499|
+----------+--------+-----+---------+



#### rowsBetween(-sys.maxsize,sys.maxsize)
##### It select all rows within that partition

In [156]:
window_product_row = Window.partitionBy(product['category'])\
                        .orderBy(product['price'].desc())\
                        .rowsBetween(-sys.maxsize,sys.maxsize)

In [157]:
price_max = (func.max(product['price']).over(window_product_row))

In [158]:
product.select(product['product'],product['category'],product['price'],price_max.alias("price_max")).show()


+----------+--------+-----+---------+
|   product|category|price|price_max|
+----------+--------+-----+---------+
|    iPhone|  Mobile|  999|      999|
|Samsung JX|  Mobile|  799|      999|
|Redmi Note|  Mobile|  399|      999|
|   OnePlus|  Mobile|  356|      999|
|        Mi|  Mobile|  299|      999|
|  Micromax|  Mobile|  249|      999|
|Samsung TX|  Tablet|  999|      999|
|      iPad|  Tablet|  789|      999|
|    Lenovo|  Tablet|  499|      999|
|        Xu|  Tablet|  267|      999|
+----------+--------+-----+---------+



#### rangeBetween(-sys.maxsize,sys.maxsize)
##### It select all rows within that partition

In [159]:
window_product_row = Window.partitionBy(product['category'])\
                        .orderBy(product['price'].desc())\
                        .rangeBetween(-sys.maxsize,sys.maxsize)

In [160]:
price_max = (func.max(product['price']).over(window_product_row))

In [161]:
product.select(product['product'],product['category'],product['price'],price_max.alias("price_max")).show()


+----------+--------+-----+---------+
|   product|category|price|price_max|
+----------+--------+-----+---------+
|    iPhone|  Mobile|  999|      999|
|Samsung JX|  Mobile|  799|      999|
|Redmi Note|  Mobile|  399|      999|
|   OnePlus|  Mobile|  356|      999|
|        Mi|  Mobile|  299|      999|
|  Micromax|  Mobile|  249|      999|
|Samsung TX|  Tablet|  999|      999|
|      iPad|  Tablet|  789|      999|
|    Lenovo|  Tablet|  499|      999|
|        Xu|  Tablet|  267|      999|
+----------+--------+-----+---------+



### PRODUCT PRICE DIFFERENCE

In [162]:
price_difference = (func.max(product['price']).over(window_product_row)  - product['price'])

In [164]:
product.select(product['product'],product['category'],product['price'],price_difference.alias("price_difference")).show()


+----------+--------+-----+----------------+
|   product|category|price|price_difference|
+----------+--------+-----+----------------+
|    iPhone|  Mobile|  999|             0.0|
|Samsung JX|  Mobile|  799|           200.0|
|Redmi Note|  Mobile|  399|           600.0|
|   OnePlus|  Mobile|  356|           643.0|
|        Mi|  Mobile|  299|           700.0|
|  Micromax|  Mobile|  249|           750.0|
|Samsung TX|  Tablet|  999|             0.0|
|      iPad|  Tablet|  789|           210.0|
|    Lenovo|  Tablet|  499|           500.0|
|        Xu|  Tablet|  267|           732.0|
+----------+--------+-----+----------------+

