# DataFrame

In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext

In [2]:
conf = SparkConf().setMaster('local').setAppName('sparkApp')
spark = SparkContext(conf=conf)


In [3]:
sqlCtx = SQLContext(spark)
sqlCtx

<pyspark.sql.context.SQLContext at 0x15c5b523e10>

- csv 파일을 이용한 dataframe 만들기

In [4]:
orders = sqlCtx.read.csv('./data/orders.csv', header=True, inferSchema=True)
orders

DataFrame[OrderID: int, CustomerID: int, EmployeeID: int, OrderDate: string, ShipperID: double]

In [5]:
orders.printSchema()

root
 |-- OrderID: integer (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- EmployeeID: integer (nullable = true)
 |-- OrderDate: string (nullable = true)
 |-- ShipperID: double (nullable = true)



In [6]:
print(orders.columns)
print(type(orders.columns))

['OrderID', 'CustomerID', 'EmployeeID', 'OrderDate', 'ShipperID']
<class 'list'>


In [7]:
orders.describe().show()

+-------+-----------------+------------------+------------------+---------+------------------+
|summary|          OrderID|        CustomerID|        EmployeeID|OrderDate|         ShipperID|
+-------+-----------------+------------------+------------------+---------+------------------+
|  count|              196|               196|               196|      196|               196|
|   mean|          10345.5| 48.64795918367347|4.3520408163265305|     null|2.0714285714285716|
| stddev|56.72448031200168|25.621776513566466|  2.41651283366105|     null|0.7877263614433762|
|    min|            10248|                 2|                 1| 1/1/1997|               1.0|
|    max|            10443|                91|                 9| 9/9/1996|               3.0|
+-------+-----------------+------------------+------------------+---------+------------------+



In [8]:
orders.summary().show()

+-------+-----------------+------------------+------------------+---------+------------------+
|summary|          OrderID|        CustomerID|        EmployeeID|OrderDate|         ShipperID|
+-------+-----------------+------------------+------------------+---------+------------------+
|  count|              196|               196|               196|      196|               196|
|   mean|          10345.5| 48.64795918367347|4.3520408163265305|     null|2.0714285714285716|
| stddev|56.72448031200168|25.621776513566466|  2.41651283366105|     null|0.7877263614433762|
|    min|            10248|                 2|                 1| 1/1/1997|               1.0|
|    25%|            10296|                25|                 2|     null|               1.0|
|    50%|            10345|                51|                 4|     null|               2.0|
|    75%|            10394|                69|                 6|     null|               3.0|
|    max|            10443|                91|    

In [9]:
orders.first()

Row(OrderID=10248, CustomerID=90, EmployeeID=5, OrderDate='7/4/1996', ShipperID=3.0)

In [10]:
# 검색은 select() 사용
orders.select(['OrderID', 'CustomerID']).show()

+-------+----------+
|OrderID|CustomerID|
+-------+----------+
|  10248|        90|
|  10249|        81|
|  10250|        34|
|  10251|        84|
|  10252|        76|
|  10253|        34|
|  10254|        14|
|  10255|        68|
|  10256|        88|
|  10257|        35|
|  10258|        20|
|  10259|        13|
|  10260|        55|
|  10261|        61|
|  10262|        65|
|  10263|        20|
|  10264|        24|
|  10265|         7|
|  10266|        87|
|  10267|        25|
+-------+----------+
only showing top 20 rows



In [11]:
# withColumn() 함수
# order.show()

orders.withColumn('newOrderIdD', orders['OrderID']+2).show()

+-------+----------+----------+---------+---------+-----------+
|OrderID|CustomerID|EmployeeID|OrderDate|ShipperID|newOrderIdD|
+-------+----------+----------+---------+---------+-----------+
|  10248|        90|         5| 7/4/1996|      3.0|      10250|
|  10249|        81|         6| 7/5/1996|      1.0|      10251|
|  10250|        34|         4| 7/8/1996|      2.0|      10252|
|  10251|        84|         3| 7/8/1996|      1.0|      10253|
|  10252|        76|         4| 7/9/1996|      2.0|      10254|
|  10253|        34|         3|7/10/1996|      2.0|      10255|
|  10254|        14|         5|7/11/1996|      2.0|      10256|
|  10255|        68|         9|7/12/1996|      3.0|      10257|
|  10256|        88|         3|7/15/1996|      2.0|      10258|
|  10257|        35|         4|7/16/1996|      3.0|      10259|
|  10258|        20|         1|7/17/1996|      1.0|      10260|
|  10259|        13|         4|7/18/1996|      3.0|      10261|
|  10260|        55|         4|7/19/1996

In [12]:
orders.withColumnRenamed('OrderID', 'renameOrderID').show()

+-------------+----------+----------+---------+---------+
|renameOrderID|CustomerID|EmployeeID|OrderDate|ShipperID|
+-------------+----------+----------+---------+---------+
|        10248|        90|         5| 7/4/1996|      3.0|
|        10249|        81|         6| 7/5/1996|      1.0|
|        10250|        34|         4| 7/8/1996|      2.0|
|        10251|        84|         3| 7/8/1996|      1.0|
|        10252|        76|         4| 7/9/1996|      2.0|
|        10253|        34|         3|7/10/1996|      2.0|
|        10254|        14|         5|7/11/1996|      2.0|
|        10255|        68|         9|7/12/1996|      3.0|
|        10256|        88|         3|7/15/1996|      2.0|
|        10257|        35|         4|7/16/1996|      3.0|
|        10258|        20|         1|7/17/1996|      1.0|
|        10259|        13|         4|7/18/1996|      3.0|
|        10260|        55|         4|7/19/1996|      1.0|
|        10261|        61|         4|7/19/1996|      2.0|
|        10262

In [13]:
orders.show()

+-------+----------+----------+---------+---------+
|OrderID|CustomerID|EmployeeID|OrderDate|ShipperID|
+-------+----------+----------+---------+---------+
|  10248|        90|         5| 7/4/1996|      3.0|
|  10249|        81|         6| 7/5/1996|      1.0|
|  10250|        34|         4| 7/8/1996|      2.0|
|  10251|        84|         3| 7/8/1996|      1.0|
|  10252|        76|         4| 7/9/1996|      2.0|
|  10253|        34|         3|7/10/1996|      2.0|
|  10254|        14|         5|7/11/1996|      2.0|
|  10255|        68|         9|7/12/1996|      3.0|
|  10256|        88|         3|7/15/1996|      2.0|
|  10257|        35|         4|7/16/1996|      3.0|
|  10258|        20|         1|7/17/1996|      1.0|
|  10259|        13|         4|7/18/1996|      3.0|
|  10260|        55|         4|7/19/1996|      1.0|
|  10261|        61|         4|7/19/1996|      2.0|
|  10262|        65|         8|7/22/1996|      3.0|
|  10263|        20|         9|7/23/1996|      3.0|
|  10264|   

In [14]:
# groupby() - 집계함수


cospi

In [17]:
orders = sqlCtx.read.csv('./data/cospi.csv',
                        header = True,
                        inferSchema = True)
type(orders)

pyspark.sql.dataframe.DataFrame

In [18]:
orders.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: integer (nullable = true)
 |-- High: integer (nullable = true)
 |-- Low: integer (nullable = true)
 |-- Close: integer (nullable = true)
 |-- Volume: integer (nullable = true)



In [19]:
orders.show()

+-------------------+-------+-------+-------+-------+------+
|               Date|   Open|   High|    Low|  Close|Volume|
+-------------------+-------+-------+-------+-------+------+
|2016-02-26 00:00:00|1180000|1187000|1172000|1172000|176906|
|2016-02-25 00:00:00|1172000|1187000|1172000|1179000|128321|
|2016-02-24 00:00:00|1178000|1179000|1161000|1172000|140407|
|2016-02-23 00:00:00|1179000|1189000|1173000|1181000|147578|
|2016-02-22 00:00:00|1190000|1192000|1166000|1175000|174075|
|2016-02-19 00:00:00|1187000|1195000|1174000|1190000|175889|
|2016-02-18 00:00:00|1203000|1203000|1178000|1187000|211795|
|2016-02-17 00:00:00|1179000|1201000|1169000|1185000|245929|
|2016-02-16 00:00:00|1158000|1179000|1157000|1168000|179087|
|2016-02-15 00:00:00|1154000|1160000|1144000|1154000|182471|
|2016-02-12 00:00:00|1130000|1151000|1122000|1130000|254115|
|2016-02-11 00:00:00|1118000|1137000|1118000|1130000|304899|
|2016-02-05 00:00:00|1156000|1169000|1156000|1164000|183280|
|2016-02-04 00:00:00|115

In [26]:
# filter (조건식)
# 날짜가 2월인 데이터만 필터링한다면?

orders.filter(orders['Date'] >= '2016-02-01').show()

+-------------------+-------+-------+-------+-------+------+
|               Date|   Open|   High|    Low|  Close|Volume|
+-------------------+-------+-------+-------+-------+------+
|2016-02-26 00:00:00|1180000|1187000|1172000|1172000|176906|
|2016-02-25 00:00:00|1172000|1187000|1172000|1179000|128321|
|2016-02-24 00:00:00|1178000|1179000|1161000|1172000|140407|
|2016-02-23 00:00:00|1179000|1189000|1173000|1181000|147578|
|2016-02-22 00:00:00|1190000|1192000|1166000|1175000|174075|
|2016-02-19 00:00:00|1187000|1195000|1174000|1190000|175889|
|2016-02-18 00:00:00|1203000|1203000|1178000|1187000|211795|
|2016-02-17 00:00:00|1179000|1201000|1169000|1185000|245929|
|2016-02-16 00:00:00|1158000|1179000|1157000|1168000|179087|
|2016-02-15 00:00:00|1154000|1160000|1144000|1154000|182471|
|2016-02-12 00:00:00|1130000|1151000|1122000|1130000|254115|
|2016-02-11 00:00:00|1118000|1137000|1118000|1130000|304899|
|2016-02-05 00:00:00|1156000|1169000|1156000|1164000|183280|
|2016-02-04 00:00:00|115

In [32]:
# 종가가 1200000 이상인 데이터만 필터링한다면?

orders.filter(orders['Close'] >= 1200000).show()

+-------------------+-------+-------+-------+-------+------+
|               Date|   Open|   High|    Low|  Close|Volume|
+-------------------+-------+-------+-------+-------+------+
|2016-01-05 00:00:00|1202000|1218000|1186000|1208000|207947|
|2016-01-04 00:00:00|1260000|1260000|1205000|1205000|304050|
|2015-12-30 00:00:00|1260000|1272000|1254000|1260000|203349|
|2015-12-29 00:00:00|1265000|1266000|1241000|1254000|231802|
|2015-12-28 00:00:00|1285000|1289000|1266000|1266000|225997|
|2015-12-24 00:00:00|1295000|1300000|1285000|1285000|151322|
|2015-12-23 00:00:00|1292000|1299000|1282000|1295000|162043|
|2015-12-22 00:00:00|1280000|1292000|1267000|1292000|203938|
|2015-12-21 00:00:00|1278000|1285000|1261000|1280000|157354|
|2015-12-18 00:00:00|1265000|1288000|1264000|1278000|167721|
|2015-12-17 00:00:00|1301000|1308000|1275000|1290000|167390|
|2015-12-16 00:00:00|1278000|1310000|1278000|1299000|207688|
|2015-12-15 00:00:00|1261000|1280000|1260000|1277000|175253|
|2015-12-14 00:00:00|127

In [31]:
orders.filter('Close >= 1200000').show()

+-------------------+-------+-------+-------+-------+------+
|               Date|   Open|   High|    Low|  Close|Volume|
+-------------------+-------+-------+-------+-------+------+
|2016-01-05 00:00:00|1202000|1218000|1186000|1208000|207947|
|2016-01-04 00:00:00|1260000|1260000|1205000|1205000|304050|
|2015-12-30 00:00:00|1260000|1272000|1254000|1260000|203349|
|2015-12-29 00:00:00|1265000|1266000|1241000|1254000|231802|
|2015-12-28 00:00:00|1285000|1289000|1266000|1266000|225997|
|2015-12-24 00:00:00|1295000|1300000|1285000|1285000|151322|
|2015-12-23 00:00:00|1292000|1299000|1282000|1295000|162043|
|2015-12-22 00:00:00|1280000|1292000|1267000|1292000|203938|
|2015-12-21 00:00:00|1278000|1285000|1261000|1280000|157354|
|2015-12-18 00:00:00|1265000|1288000|1264000|1278000|167721|
|2015-12-17 00:00:00|1301000|1308000|1275000|1290000|167390|
|2015-12-16 00:00:00|1278000|1310000|1278000|1299000|207688|
|2015-12-15 00:00:00|1261000|1280000|1260000|1277000|175253|
|2015-12-14 00:00:00|127

In [33]:
# 조건에 대한 select

orders.filter(orders['Close'] >= 120000).select(['Date', 'Open', 'Close']).show()

+-------------------+-------+-------+
|               Date|   Open|  Close|
+-------------------+-------+-------+
|2016-02-26 00:00:00|1180000|1172000|
|2016-02-25 00:00:00|1172000|1179000|
|2016-02-24 00:00:00|1178000|1172000|
|2016-02-23 00:00:00|1179000|1181000|
|2016-02-22 00:00:00|1190000|1175000|
|2016-02-19 00:00:00|1187000|1190000|
|2016-02-18 00:00:00|1203000|1187000|
|2016-02-17 00:00:00|1179000|1185000|
|2016-02-16 00:00:00|1158000|1168000|
|2016-02-15 00:00:00|1154000|1154000|
|2016-02-12 00:00:00|1130000|1130000|
|2016-02-11 00:00:00|1118000|1130000|
|2016-02-05 00:00:00|1156000|1164000|
|2016-02-04 00:00:00|1150000|1156000|
|2016-02-03 00:00:00|1150000|1146000|
|2016-02-02 00:00:00|1161000|1156000|
|2016-02-01 00:00:00|1152000|1163000|
|2016-01-29 00:00:00|1140000|1150000|
|2016-01-28 00:00:00|1164000|1145000|
|2016-01-27 00:00:00|1126000|1175000|
+-------------------+-------+-------+
only showing top 20 rows



In [39]:
orders.filter((orders['Open'] > 1200000) & (orders['Open'] <1250000)).show()

+-------------------+-------+-------+-------+-------+------+
|               Date|   Open|   High|    Low|  Close|Volume|
+-------------------+-------+-------+-------+-------+------+
|2016-02-18 00:00:00|1203000|1203000|1178000|1187000|211795|
|2016-01-06 00:00:00|1208000|1208000|1168000|1175000|359895|
|2016-01-05 00:00:00|1202000|1218000|1186000|1208000|207947|
|2015-10-15 00:00:00|1244000|1282000|1243000|1269000|243476|
|2015-10-14 00:00:00|1248000|1260000|1237000|1254000|174719|
|2015-07-31 00:00:00|1220000|1222000|1175000|1185000|370146|
|2015-07-28 00:00:00|1224000|1251000|1219000|1230000|252036|
|2015-07-27 00:00:00|1229000|1247000|1228000|1230000|198204|
|2015-07-24 00:00:00|1227000|1238000|1224000|1229000|194869|
|2015-07-23 00:00:00|1244000|1253000|1234000|1234000|198639|
|2015-07-22 00:00:00|1244000|1260000|1235000|1253000|266557|
|2015-07-16 00:00:00|1223000|1287000|1223000|1282000|217793|
|2015-07-15 00:00:00|1225000|1238000|1224000|1235000|166863|
|2015-07-09 00:00:00|123

In [40]:
orders.filter((orders['Open'] > 1200000) | (orders['Open'] <1250000)).show()

+-------------------+-------+-------+-------+-------+------+
|               Date|   Open|   High|    Low|  Close|Volume|
+-------------------+-------+-------+-------+-------+------+
|2016-02-26 00:00:00|1180000|1187000|1172000|1172000|176906|
|2016-02-25 00:00:00|1172000|1187000|1172000|1179000|128321|
|2016-02-24 00:00:00|1178000|1179000|1161000|1172000|140407|
|2016-02-23 00:00:00|1179000|1189000|1173000|1181000|147578|
|2016-02-22 00:00:00|1190000|1192000|1166000|1175000|174075|
|2016-02-19 00:00:00|1187000|1195000|1174000|1190000|175889|
|2016-02-18 00:00:00|1203000|1203000|1178000|1187000|211795|
|2016-02-17 00:00:00|1179000|1201000|1169000|1185000|245929|
|2016-02-16 00:00:00|1158000|1179000|1157000|1168000|179087|
|2016-02-15 00:00:00|1154000|1160000|1144000|1154000|182471|
|2016-02-12 00:00:00|1130000|1151000|1122000|1130000|254115|
|2016-02-11 00:00:00|1118000|1137000|1118000|1130000|304899|
|2016-02-05 00:00:00|1156000|1169000|1156000|1164000|183280|
|2016-02-04 00:00:00|115

In [42]:
# 볼륨이 30 이하

orders.filter(orders['Volume']<=300000).show()

+-------------------+-------+-------+-------+-------+------+
|               Date|   Open|   High|    Low|  Close|Volume|
+-------------------+-------+-------+-------+-------+------+
|2016-02-26 00:00:00|1180000|1187000|1172000|1172000|176906|
|2016-02-25 00:00:00|1172000|1187000|1172000|1179000|128321|
|2016-02-24 00:00:00|1178000|1179000|1161000|1172000|140407|
|2016-02-23 00:00:00|1179000|1189000|1173000|1181000|147578|
|2016-02-22 00:00:00|1190000|1192000|1166000|1175000|174075|
|2016-02-19 00:00:00|1187000|1195000|1174000|1190000|175889|
|2016-02-18 00:00:00|1203000|1203000|1178000|1187000|211795|
|2016-02-17 00:00:00|1179000|1201000|1169000|1185000|245929|
|2016-02-16 00:00:00|1158000|1179000|1157000|1168000|179087|
|2016-02-15 00:00:00|1154000|1160000|1144000|1154000|182471|
|2016-02-12 00:00:00|1130000|1151000|1122000|1130000|254115|
|2016-02-05 00:00:00|1156000|1169000|1156000|1164000|183280|
|2016-02-04 00:00:00|1150000|1161000|1148000|1156000|236429|
|2016-02-03 00:00:00|115

In [43]:
# 볼륨이 30 이하 (부정)

orders.filter(~(orders['Volume']>300000)).show()

+-------------------+-------+-------+-------+-------+------+
|               Date|   Open|   High|    Low|  Close|Volume|
+-------------------+-------+-------+-------+-------+------+
|2016-02-26 00:00:00|1180000|1187000|1172000|1172000|176906|
|2016-02-25 00:00:00|1172000|1187000|1172000|1179000|128321|
|2016-02-24 00:00:00|1178000|1179000|1161000|1172000|140407|
|2016-02-23 00:00:00|1179000|1189000|1173000|1181000|147578|
|2016-02-22 00:00:00|1190000|1192000|1166000|1175000|174075|
|2016-02-19 00:00:00|1187000|1195000|1174000|1190000|175889|
|2016-02-18 00:00:00|1203000|1203000|1178000|1187000|211795|
|2016-02-17 00:00:00|1179000|1201000|1169000|1185000|245929|
|2016-02-16 00:00:00|1158000|1179000|1157000|1168000|179087|
|2016-02-15 00:00:00|1154000|1160000|1144000|1154000|182471|
|2016-02-12 00:00:00|1130000|1151000|1122000|1130000|254115|
|2016-02-05 00:00:00|1156000|1169000|1156000|1164000|183280|
|2016-02-04 00:00:00|1150000|1161000|1148000|1156000|236429|
|2016-02-03 00:00:00|115

In [47]:
# 날짜가 2월 26일인 것만

orders.filter(orders['Date']=='2016-02-26').show()

+-------------------+-------+-------+-------+-------+------+
|               Date|   Open|   High|    Low|  Close|Volume|
+-------------------+-------+-------+-------+-------+------+
|2016-02-26 00:00:00|1180000|1187000|1172000|1172000|176906|
+-------------------+-------+-------+-------+-------+------+



titanic

In [53]:
titanic = sqlCtx.read.csv('./data/titanic_train.csv',
                        header = True,
                        inferSchema = True)
type(titanic)

pyspark.sql.dataframe.DataFrame

In [54]:
titanic.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [55]:
titanic.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [56]:
titanic.columns

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

- count()

In [59]:
titanic.count()

891

- select() 변수선택 PassengerId, Name

In [62]:
titanic.select(['PassengerId', 'Name']).show()

+-----------+--------------------+
|PassengerId|                Name|
+-----------+--------------------+
|          1|Braund, Mr. Owen ...|
|          2|Cumings, Mrs. Joh...|
|          3|Heikkinen, Miss. ...|
|          4|Futrelle, Mrs. Ja...|
|          5|Allen, Mr. Willia...|
|          6|    Moran, Mr. James|
|          7|McCarthy, Mr. Tim...|
|          8|Palsson, Master. ...|
|          9|Johnson, Mrs. Osc...|
|         10|Nasser, Mrs. Nich...|
|         11|Sandstrom, Miss. ...|
|         12|Bonnell, Miss. El...|
|         13|Saundercock, Mr. ...|
|         14|Andersson, Mr. An...|
|         15|Vestrom, Miss. Hu...|
|         16|Hewlett, Mrs. (Ma...|
|         17|Rice, Master. Eugene|
|         18|Williams, Mr. Cha...|
|         19|Vander Planke, Mr...|
|         20|Masselmani, Mrs. ...|
+-----------+--------------------+
only showing top 20 rows



In [65]:
# 성별이 여성/ PassengerId, Name, Sex, Survived 출력

titanic.filter(titanic['Sex'] == 'female').select(['PassengerId', 'Name', 'Sex', 'Survived']).show()

+-----------+--------------------+------+--------+
|PassengerId|                Name|   Sex|Survived|
+-----------+--------------------+------+--------+
|          2|Cumings, Mrs. Joh...|female|       1|
|          3|Heikkinen, Miss. ...|female|       1|
|          4|Futrelle, Mrs. Ja...|female|       1|
|          9|Johnson, Mrs. Osc...|female|       1|
|         10|Nasser, Mrs. Nich...|female|       1|
|         11|Sandstrom, Miss. ...|female|       1|
|         12|Bonnell, Miss. El...|female|       1|
|         15|Vestrom, Miss. Hu...|female|       0|
|         16|Hewlett, Mrs. (Ma...|female|       1|
|         19|Vander Planke, Mr...|female|       0|
|         20|Masselmani, Mrs. ...|female|       1|
|         23|"McGowan, Miss. A...|female|       1|
|         25|Palsson, Miss. To...|female|       0|
|         26|Asplund, Mrs. Car...|female|       1|
|         29|"O'Dwyer, Miss. E...|female|       1|
|         32|Spencer, Mrs. Wil...|female|       1|
|         33|Glynn, Miss. Mary.

In [67]:
# 성별이 여성이면서 Survived한 사람/ PassengerId, Name, Sex, Survived 출력

titanic.filter((titanic.Sex =='female') & (titanic.Survived == 1) ).select(['PassengerId', 'Name', 'Sex', 'Survived']).show()

+-----------+--------------------+------+--------+
|PassengerId|                Name|   Sex|Survived|
+-----------+--------------------+------+--------+
|          2|Cumings, Mrs. Joh...|female|       1|
|          3|Heikkinen, Miss. ...|female|       1|
|          4|Futrelle, Mrs. Ja...|female|       1|
|          9|Johnson, Mrs. Osc...|female|       1|
|         10|Nasser, Mrs. Nich...|female|       1|
|         11|Sandstrom, Miss. ...|female|       1|
|         12|Bonnell, Miss. El...|female|       1|
|         16|Hewlett, Mrs. (Ma...|female|       1|
|         20|Masselmani, Mrs. ...|female|       1|
|         23|"McGowan, Miss. A...|female|       1|
|         26|Asplund, Mrs. Car...|female|       1|
|         29|"O'Dwyer, Miss. E...|female|       1|
|         32|Spencer, Mrs. Wil...|female|       1|
|         33|Glynn, Miss. Mary...|female|       1|
|         40|Nicola-Yarred, Mi...|female|       1|
|         44|Laroche, Miss. Si...|female|       1|
|         45|Devaney, Miss. Ma.

In [74]:
# 선실등급 별 요금 평균

# titanic.groupby('Pclass').mean('Fare').show()

# class_grp = titanic.groupBy('Pclass')
# class_grp.avg('Fare').show()


titanic.groupBy('Pclass').avg('Fare').sort('avg(Fare)', ascending = False).show()


+------+------------------+
|Pclass|         avg(Fare)|
+------+------------------+
|     1| 84.15468749999992|
|     2| 20.66218315217391|
|     3|13.675550101832997|
+------+------------------+



In [75]:
titanic.createOrReplaceTempView('titanicView')

In [77]:
sqlCtx.sql('select * from titanicView where sex = "female"').show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|    Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+--------+-----+--------+
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599| 71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|   7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|    53.1| C123|       S|
|          9|       1|     3|Johnson, Mrs. Osc...|female|27.0|    0|    2|          347742| 11.1333| null|       S|
|         10|       1|     2|Nasser, Mrs. Nich...|female|14.0|    1|    0|          237736| 30.0708| null|       C|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    

### Spark SQL

In [78]:
import pandas as pd

In [79]:
data1 = {'PassengerId':{0:1, 1:2, 2:3, 3:4, 4:5},
         'Name' : {0:'Owen', 1:'Florence', 2:'Laina', 3:'Lily', 4:"William"},
         'sex' : {0: 'male', 1: 'female', 2:'female', 3:'female', 4:'male'},
         'Survived': {0:0, 1:1, 2:1, 3:1, 4:0}
        }

data2 = {'PassengerId':{0:1, 1:2, 2:3, 3:4, 4:5},
         'Age' : {0: 22, 1: 38, 2: 33, 3: 35, 4: 35},
         'Fare' : {0: 7.3, 1: 71.3, 2:7.9, 3:53.1, 4:8.0},
         'Pclass': {0:3, 1:1, 2:3, 3:1, 4:3}
        }


In [81]:
data1.keys()

dict_keys(['PassengerId', 'Name', 'sex', 'Survived'])

pandas df -> spark df

In [83]:
sample_df01 = pd.DataFrame(data1, columns = data1.keys())
sample_df02 = pd.DataFrame(data2, columns = data2.keys())

display(sample_df01)
display(sample_df02)

type(sample_df01)

Unnamed: 0,PassengerId,Name,sex,Survived
0,1,Owen,male,0
1,2,Florence,female,1
2,3,Laina,female,1
3,4,Lily,female,1
4,5,William,male,0


Unnamed: 0,PassengerId,Age,Fare,Pclass
0,1,22,7.3,3
1,2,38,71.3,1
2,3,33,7.9,3
3,4,35,53.1,1
4,5,35,8.0,3


pandas.core.frame.DataFrame

In [86]:
# pandas -> spark type의 DF로 만드는 방법
spark_df01 = sqlCtx.createDataFrame(sample_df01)
# type(spark_df01)

spark_df02 = sqlCtx.createDataFrame(sample_df02)
# type(spark_df02)

In [87]:
spark_df01.printSchema()

root
 |-- PassengerId: long (nullable = true)
 |-- Name: string (nullable = true)
 |-- sex: string (nullable = true)
 |-- Survived: long (nullable = true)



In [88]:
spark_df02.printSchema()

root
 |-- PassengerId: long (nullable = true)
 |-- Age: long (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Pclass: long (nullable = true)



In [89]:
spark_df02.show()

+-----------+---+----+------+
|PassengerId|Age|Fare|Pclass|
+-----------+---+----+------+
|          1| 22| 7.3|     3|
|          2| 38|71.3|     1|
|          3| 33| 7.9|     3|
|          4| 35|53.1|     1|
|          5| 35| 8.0|     3|
+-----------+---+----+------+



In [90]:
# mirroring

spark_df01.createOrReplaceTempView('titanic01')
spark_df02.createOrReplaceTempView('titanic02')

- Spark SQL SELECT

In [92]:
sqlCtx.sql('select * from titanic01').show()

+-----------+--------+------+--------+
|PassengerId|    Name|   sex|Survived|
+-----------+--------+------+--------+
|          1|    Owen|  male|       0|
|          2|Florence|female|       1|
|          3|   Laina|female|       1|
|          4|    Lily|female|       1|
|          5| William|  male|       0|
+-----------+--------+------+--------+



In [93]:
sqlCtx.sql('select * from titanic01 where sex="male" ').show()

+-----------+-------+----+--------+
|PassengerId|   Name| sex|Survived|
+-----------+-------+----+--------+
|          1|   Owen|male|       0|
|          5|William|male|       0|
+-----------+-------+----+--------+



In [100]:
# 성별에 따른 생존자 수를 구해본다면?

sqlCtx.sql(""" select sex, sum(survived) as cnt
               from titanic01
               group by sex """).show()

+------+---+
|   sex|cnt|
+------+---+
|female|  3|
|  male|  0|
+------+---+



In [104]:
# 선실등급과 성별에 따른 Fare의 평균

sqlCtx.sql(""" select  pclass, avg(fare)
               from titanic01 t1
               join titanic02 t2 on (t1.passengerid = t2.passengerid) 
               group by (pclass)""").show()

+------+-----------------+
|pclass|        avg(fare)|
+------+-----------------+
|     1|             62.2|
|     3|7.733333333333334|
+------+-----------------+



실습

In [105]:
taxi_train_file_loc = "wasb://mllibwalkthroughs@cdspsparksamples.blob.core.windows.net/Data/NYCTaxi/JoinedTaxiTripFare.Point1Pct.Train.tsv"


In [106]:
taxi_train_file = spark.textFile(taxi_train_file_loc)

In [107]:
taxi_train_file.first()

Py4JJavaError: An error occurred while calling o730.partitions.
: java.io.IOException: No FileSystem for scheme: wasb
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
	at org.apache.spark.api.java.JavaRDDLike$class.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:745)
