## dataframe

In [1]:
myList = [
    ('1','park, aa',190),
    ('1','lee, bb',195),
    ('2','lim, cc',200),
    ('2','lee',195)
]

In [2]:
myDf = spark.createDataFrame(myList)

In [3]:
myDf.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)



In [5]:
print(myDf.take(1))

[Row(_1='1', _2='park, aa', _3=190)]


In [6]:
myDf_with_columnname = spark.createDataFrame(myList, ['year','name','height'])

In [7]:
myDf_with_columnname.printSchema()

root
 |-- year: string (nullable = true)
 |-- name: string (nullable = true)
 |-- height: long (nullable = true)



In [8]:
print(myDf_with_columnname.take(1))

[Row(year='1', name='park, aa', height=190)]


In [15]:
myDf_with_columnname.show()

+----+--------+------+
|year|    name|height|
+----+--------+------+
|   1|park, aa|   190|
|   1| lee, bb|   195|
|   2| lim, cc|   200|
|   2|     lee|   195|
+----+--------+------+



## row 객체 사용하기

In [10]:
from pyspark.sql import Row
Person = Row('year','name','height')
row1 = Person('1','park, aa',190)
row2 = Person('1','lee, bb',195)
row3 = Person('2','lim, cc',200)
row4 = Person('2','lee',195)

In [11]:
myRows = [row1,
         row2,
         row3,
         row4]

In [12]:
myDf_row = spark.createDataFrame(myRows)

In [13]:
myDf_row.printSchema()

root
 |-- year: string (nullable = true)
 |-- name: string (nullable = true)
 |-- height: long (nullable = true)



In [16]:
print(myDf_row.take(1))

[Row(year='1', name='park, aa', height=190)]


In [17]:
myDf_row.show()

+----+--------+------+
|year|    name|height|
+----+--------+------+
|   1|park, aa|   190|
|   1| lee, bb|   195|
|   2| lim, cc|   200|
|   2|     lee|   195|
+----+--------+------+



## schema 정의하고 dataframe 생성하기

In [19]:
from pyspark.sql import Row
Person = Row('year','name','height')
row1 = Person('1','park, aa',190)
row2 = Person('1','lee, bb',195)
row3 = Person('2','lim, cc',200)
row4 = Person('2','lee',195)

myRows = [row1,
         row2,
         row3,
         row4]

In [20]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, IntegerType
mySchema=StructType([
    StructField("year", StringType(), True),
    StructField("name", StringType(), True),
    StructField("height", IntegerType(), True)
])

In [21]:
myDf_schema=spark.createDataFrame(myRows, mySchema)

In [22]:
myDf_schema.printSchema()

root
 |-- year: string (nullable = true)
 |-- name: string (nullable = true)
 |-- height: integer (nullable = true)



In [23]:
myDf_schema.take(1)

[Row(year='1', name='park, aa', height=190)]

In [24]:
myDf_schema.show()

+----+--------+------+
|year|    name|height|
+----+--------+------+
|   1|park, aa|   190|
|   1| lee, bb|   195|
|   2| lim, cc|   200|
|   2|     lee|   195|
+----+--------+------+



# RDD - schema 자동 인식

In [25]:
myList = [
    ('1','park, aa',190),
    ('1','lee, bb',195),
    ('2','lim, cc',200),
    ('2','lee',195)
]

In [26]:
myRdd = spark.sparkContext.parallelize(myList)

## toDF()

In [27]:
rddDf = myRdd.toDF()

In [28]:
rddDf.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)



In [29]:
rddDf.show()

+---+--------+---+
| _1|      _2| _3|
+---+--------+---+
|  1|park, aa|190|
|  1| lee, bb|195|
|  2| lim, cc|200|
|  2|     lee|195|
+---+--------+---+



## createDataFrame()

In [30]:
rddDf1 = spark.createDataFrame(myRdd)

In [31]:
rddDf1.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: long (nullable = true)



In [32]:
rddDf1.show()

+---+--------+---+
| _1|      _2| _3|
+---+--------+---+
|  1|park, aa|190|
|  1| lee, bb|195|
|  2| lim, cc|200|
|  2|     lee|195|
+---+--------+---+



## DBMS와 유사한 dataframe

In [33]:
rddDf.where(rddDf._3 >= 195)\
     .select([rddDf._1,rddDf._2])\
     .show()

+---+-------+
| _1|     _2|
+---+-------+
|  1|lee, bb|
|  2|lim, cc|
|  2|    lee|
+---+-------+



In [38]:
rddDf.groupby(rddDf._1).max().show()

+---+-------+
| _1|max(_3)|
+---+-------+
|  1|    195|
|  2|    200|
+---+-------+



## schema 정의하고 dataframe 생성하기

In [44]:
myList = [
    ('1','park, aa',190),
    ('1','lee, bb',195),
    ('2','lim, cc',200),
    ('2','lee',195)
]

In [45]:
myRdd = spark.sparkContext.parallelize(myList)

In [40]:
schema=StructType([
    StructField("year", StringType(), True),
    StructField("name", StringType(), True),
    StructField("height", IntegerType(), True)
])

In [41]:
myDf_schema = spark.createDataFrame(myRdd,schema)

In [42]:
myDf_schema.printSchema()

root
 |-- year: string (nullable = true)
 |-- name: string (nullable = true)
 |-- height: integer (nullable = true)



In [43]:
myDf_schema.show()

+----+--------+------+
|year|    name|height|
+----+--------+------+
|   1|park, aa|   190|
|   1| lee, bb|   195|
|   2| lim, cc|   200|
|   2|     lee|   195|
+----+--------+------+



# 2

In [5]:
%%writefile data/people.csv
park, 26
lee, 28
kim, 21
lim, 42

Writing data/people.csv


In [10]:
from pyspark.sql import Row
cfile = os.path.join("data","people.csv")
lines = spark.sparkContext.textFile(cfile)

#sparkContext.textFile() 함수로 읽은 파일은 RDD이다.

In [11]:
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1].strip())))

_myDf = spark.createDataFrame(people)
#RDD에서 Row()를 사용하여 Dataframe으로 변환한다.

In [12]:
_myDf.printSchema()
_myDf.collect()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



[Row(age=26, name='park'),
 Row(age=28, name='lee'),
 Row(age=21, name='kim'),
 Row(age=42, name='lim')]

In [13]:
_myDf.show()

+---+----+
|age|name|
+---+----+
| 26|park|
| 28| lee|
| 21| kim|
| 42| lim|
+---+----+



## DataFrame으로 직접 읽기

- csv 패키지를 사용해서 읽어 본다. 우선 Spark의 csv 패키지를 추가한다. 패키지는 설정파일 spark-defaults.conf에 추가할 수 있다.

## csv 파일 읽기
- csv는 ,로 분리된 파일을 말한다.

In [14]:
%%writefile data/ds_spark.csv
1,2,3,4
11,22,33,44
111,222,333,444

Writing data/ds_spark.csv


In [15]:
df = spark.read.format('com.databricks.spark.csv')\
    .options(header='true', inferschema='true').load('data/ds_spark.csv')
df.show()

+---+---+---+---+
|  1|  2|  3|  4|
+---+---+---+---+
| 11| 22| 33| 44|
|111|222|333|444|
+---+---+---+---+



## tsv 파일 읽기
- tsv는 Tab으로 분리된 파일을 말한다. '\t'이 포함되어 있는 경우, Spark는 string으로 데이터타잎을 설정한다.

In [16]:
import numpy as np
np.array([float(x) for x in '1.658985	4.285136'.split('\t')])

array([1.658985, 4.285136])

In [17]:
import numpy as np
np.array([float(x) for x in '1.658985 4.285136'.split(' ')])

array([1.658985, 4.285136])

In [18]:
%%writefile data/ds_spark_heightweight.txt
1	65.78	112.99
2	71.52	136.49
3	69.40	153.03
4	68.22	142.34
5	67.79	144.30
6	68.70	123.30
7	69.80	141.49
8	70.01	136.46
9	67.90	112.37
10	66.78	120.67
11	66.49	127.45
12	67.62	114.14
13	68.30	125.61
14	67.12	122.46
15	68.28	116.09
16	71.09	140.00
17	66.46	129.50
18	68.65	142.97
19	71.23	137.90
20	67.13	124.04
21	67.83	141.28
22	68.88	143.54
23	63.48	97.90
24	68.42	129.50
25	67.63	141.85
26	67.21	129.72
27	70.84	142.42
28	67.49	131.55
29	66.53	108.33
30	65.44	113.89
31	69.52	103.30
32	65.81	120.75
33	67.82	125.79
34	70.60	136.22
35	71.80	140.10
36	69.21	128.75
37	66.80	141.80
38	67.66	121.23
39	67.81	131.35
40	64.05	106.71
41	68.57	124.36
42	65.18	124.86
43	69.66	139.67
44	67.97	137.37
45	65.98	106.45
46	68.67	128.76
47	66.88	145.68
48	67.70	116.82
49	69.82	143.62
50	69.09	134.93

Overwriting data/ds_spark_heightweight.txt


In [19]:
from pyspark.sql.types import *
rdd=spark.sparkContext\
    .textFile(os.path.join('data','ds_spark_heightweight.txt'))

tRdd=rdd.map(lambda x:x.split('\t'))
tDf=spark.createDataFrame(tRdd)

In [20]:
tDf.printSchema()

root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: string (nullable = true)



In [21]:
tDf.take(1)

[Row(_1='1', _2='65.78', _3='112.99')]

In [22]:
tDf.show()

+---+-----+------+
| _1|   _2|    _3|
+---+-----+------+
|  1|65.78|112.99|
|  2|71.52|136.49|
|  3|69.40|153.03|
|  4|68.22|142.34|
|  5|67.79|144.30|
|  6|68.70|123.30|
|  7|69.80|141.49|
|  8|70.01|136.46|
|  9|67.90|112.37|
| 10|66.78|120.67|
| 11|66.49|127.45|
| 12|67.62|114.14|
| 13|68.30|125.61|
| 14|67.12|122.46|
| 15|68.28|116.09|
| 16|71.09|140.00|
| 17|66.46|129.50|
| 18|68.65|142.97|
| 19|71.23|137.90|
| 20|67.13|124.04|
+---+-----+------+
only showing top 20 rows



## withcolumn
- withColumn()은 열을 추가한다. 기존에 있는 _1행을 integer로 형변환해서 id행을 만들고, 기존의 _1행을 삭제해보자.

- drop()은 열을 삭제할 때 사용한다.

In [23]:
tDf=tDf.withColumn("id",tDf['_1'].cast("integer")).drop('_1')
tDf=tDf.withColumn("height",tDf['_2'].cast("double")).drop('_2')
tDf=tDf.withColumn("weight",tDf['_3'].cast("double")).drop('_3')

In [24]:
tDf.show()

+---+------+------+
| id|height|weight|
+---+------+------+
|  1| 65.78|112.99|
|  2| 71.52|136.49|
|  3|  69.4|153.03|
|  4| 68.22|142.34|
|  5| 67.79| 144.3|
|  6|  68.7| 123.3|
|  7|  69.8|141.49|
|  8| 70.01|136.46|
|  9|  67.9|112.37|
| 10| 66.78|120.67|
| 11| 66.49|127.45|
| 12| 67.62|114.14|
| 13|  68.3|125.61|
| 14| 67.12|122.46|
| 15| 68.28|116.09|
| 16| 71.09| 140.0|
| 17| 66.46| 129.5|
| 18| 68.65|142.97|
| 19| 71.23| 137.9|
| 20| 67.13|124.04|
+---+------+------+
only showing top 20 rows



## 형변환
- 위 tsv 파일에서 생성한 RDD를 탭으로 분리하면서, float()로 형변환을 해보자.

In [25]:
import numpy as np
#myRdd=rdd.map(lambda line:np.array([float(x) for x in line.split('\t')]))
rdd=spark.sparkContext.textFile(os.path.join('data','ds_spark_heightweight.txt'))
tRdd=rdd.map(lambda line:[float(x) for x in line.split('\t')])
tRdd.take(1)

[[1.0, 65.78, 112.99]]

## dataframe 생성

In [26]:
rdd=spark.sparkContext.textFile(os.path.join('data','ds_spark_heightweight.txt'))
tRdd=rdd.map(lambda line:[float(x) for x in line.split('\t')])

In [27]:
tDf=spark.createDataFrame(tRdd,["id","weight","height"])

In [28]:
tDf.printSchema()

root
 |-- id: double (nullable = true)
 |-- weight: double (nullable = true)
 |-- height: double (nullable = true)



In [29]:
tDf.show()

+----+------+------+
|  id|weight|height|
+----+------+------+
| 1.0| 65.78|112.99|
| 2.0| 71.52|136.49|
| 3.0|  69.4|153.03|
| 4.0| 68.22|142.34|
| 5.0| 67.79| 144.3|
| 6.0|  68.7| 123.3|
| 7.0|  69.8|141.49|
| 8.0| 70.01|136.46|
| 9.0|  67.9|112.37|
|10.0| 66.78|120.67|
|11.0| 66.49|127.45|
|12.0| 67.62|114.14|
|13.0|  68.3|125.61|
|14.0| 67.12|122.46|
|15.0| 68.28|116.09|
|16.0| 71.09| 140.0|
|17.0| 66.46| 129.5|
|18.0| 68.65|142.97|
|19.0| 71.23| 137.9|
|20.0| 67.13|124.04|
+----+------+------+
only showing top 20 rows

