In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("spark-dataframe").getOrCreate()
spark

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/16 15:02:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
filepath = "/home/ubuntu/working/spark-examples/data/titanic_train.csv"

titanic_sdf = spark.read.csv(filepath, inferSchema=True, header=True)
titanic_sdf.dtypes

                                                                                

[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('Pclass', 'int'),
 ('Name', 'string'),
 ('Sex', 'string'),
 ('Age', 'double'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string')]

In [3]:
titanic_pdf = titanic_sdf.toPandas()
titanic_pdf.head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S


# 데이터 조작
- 데이터 프레임에 대한 삽입, 수정, 삭제 등

In [4]:
# Pandas에서 컬럼 수정 또는 생성 -> []
titanic_pdf_copy = titanic_pdf.copy()

In [6]:
# fare에 10 곱해서 Extra_Fare 생성

titanic_pdf_copy['Extra_Fare'] = titanic_pdf_copy['Fare']*10
titanic_pdf_copy

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,Extra_Fare
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.2500,,S,72.500
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C,712.833
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.9250,,S,79.250
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1000,C123,S,531.000
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.0500,,S,80.500
...,...,...,...,...,...,...,...,...,...,...,...,...,...
886,887,0,2,"Montvila, Rev. Juozas",male,27.0,0,0,211536,13.0000,,S,130.000
887,888,1,1,"Graham, Miss. Margaret Edith",female,19.0,0,0,112053,30.0000,B42,S,300.000
888,889,0,3,"""Johnston, Miss. Catherine Helen """"Carrie""""""",female,,1,2,W./C. 6607,23.4500,,S,234.500
889,890,1,1,"Behr, Mr. Karl Howell",male,26.0,0,0,111369,30.0000,C148,C,300.000


In [7]:
# 데이터 수정. Extra_Fare + 20

titanic_pdf_copy['Extra_Fare'] = titanic_pdf_copy['Extra_Fare']+20
titanic_pdf_copy.head()

# - pandas df에서는 생성과 수정 방식이 같음.

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,Extra_Fare
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S,92.5
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C,732.833
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S,99.25
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S,551.0
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S,100.5


# Spark Dataframe 데이터 조작
- `withColumn()` method 이용해 기존 컬럼 값 수정, 타입 변경, 신규 컬럼 추가
    - `withColumn('신규 또는 업데이트 되는 컬럼명', '신규 또는 업데이트 되는 값')`
- 신규 또는 업데이트 되는 값을 생성 시에 기존 컬럼을 기반으로 한다면,
    - 신규 컬럼은 **문자열**로 지정
    - 기존 컬럼은 `col` 사용
- 신규 컬럼 추가는 `select`도 가능
- 컬럼명 변경은 `withColumnRenamed()` method 사용

In [8]:
import pyspark.sql.functions as F

titanic_sdf_copy = titanic_sdf.select('*')

In [9]:
# withColumn 이용해 Extra_Fare 생성

titanic_sdf_copy = titanic_sdf_copy.withColumn('Extra_Fare', F.col('Fare')*10)
titanic_sdf_copy.select('Fare', 'Extra_Fare').show(5)

+-------+----------+
|   Fare|Extra_Fare|
+-------+----------+
|   7.25|      72.5|
|71.2833|   712.833|
|  7.925|     79.25|
|   53.1|     531.0|
|   8.05|      80.5|
+-------+----------+
only showing top 5 rows



In [10]:
# 기존 컬럼 수정

titanic_sdf_copy = titanic_sdf_copy.withColumn('Extra_Fare', F.col('Extra_Fare')+20)
titanic_sdf_copy.select('Fare', 'Extra_Fare').show(5)

+-------+----------+
|   Fare|Extra_Fare|
+-------+----------+
|   7.25|      92.5|
|71.2833|   732.833|
|  7.925|     99.25|
|   53.1|     551.0|
|   8.05|     100.5|
+-------+----------+
only showing top 5 rows



In [11]:
# 컬럼의 타입 변환
# - Fare의 Type : Double -> Integer

titanic_sdf_copy = titanic_sdf_copy.withColumn('Fare', F.col('Fare').cast('Integer'))
titanic_sdf_copy.select('Fare').show(5)

+----+
|Fare|
+----+
|   7|
|  71|
|   7|
|  53|
|   8|
+----+
only showing top 5 rows



# 리터럴
- 프로그램이 언어에서 코드에 등장하는 직접적인 값들을 literal 이라고 함
- 리터럴은 상수

In [13]:
titanic_pdf_copy['Extra_Fare'] = 10
titanic_pdf_copy.head()

Unnamed: 0,PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked,Extra_Fare
0,1,0,3,"Braund, Mr. Owen Harris",male,22.0,1,0,A/5 21171,7.25,,S,10
1,2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Th...",female,38.0,1,0,PC 17599,71.2833,C85,C,10
2,3,1,3,"Heikkinen, Miss. Laina",female,26.0,0,0,STON/O2. 3101282,7.925,,S,10
3,4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35.0,1,0,113803,53.1,C123,S,10
4,5,0,3,"Allen, Mr. William Henry",male,35.0,0,0,373450,8.05,,S,10


In [14]:
# spark df에서 리터럴로 데이터를 삽입 또는 수정
# - 상수값으로 특정 컬럼 전체 데이터를 변경하려면 lit 함수 사용

titanic_sdf_copy = titanic_sdf_copy.withColumn('Extra_Fare', F.lit(10))
titanic_sdf_copy.select('Extra_Fare').show(5)

+----------+
|Extra_Fare|
+----------+
|        10|
|        10|
|        10|
|        10|
|        10|
+----------+
only showing top 5 rows



In [15]:
# select절을 활용해 컬럼 추가

titanic_sdf_copy = titanic_sdf_copy.select('*', F.substring('Cabin', 0, 1).alias('Cabin_Section'))
titanic_sdf_copy.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+----+-----+--------+----------+-------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|Fare|Cabin|Embarked|Extra_Fare|Cabin_Section|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+----+-----+--------+----------+-------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7| null|       S|        10|         null|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|  71|  C85|       C|        10|            C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|   7| null|       S|        10|         null|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|  53| C123|       S|        10|            C|
|          5|       0|     3|Allen, Mr. W

In [16]:
# F.split(컬럼명, 나눌 문자)
# - first_name 컬럼과 last_name 컬럼 추가. withColumn 사용
# - split 후 getItem(0), getItem(1)

titanic_sdf_copy = titanic_sdf_copy.withColumn('first_name', F.split('Name', ',').getItem(0))
titanic_sdf_copy = titanic_sdf_copy.withColumn('last_name', F.split('Name', ',').getItem(1))
titanic_sdf_copy.select('first_name', 'last_name').show()

+-------------+--------------------+
|   first_name|           last_name|
+-------------+--------------------+
|       Braund|     Mr. Owen Harris|
|      Cumings| Mrs. John Bradle...|
|    Heikkinen|         Miss. Laina|
|     Futrelle| Mrs. Jacques Hea...|
|        Allen|   Mr. William Henry|
|        Moran|           Mr. James|
|     McCarthy|       Mr. Timothy J|
|      Palsson| Master. Gosta Le...|
|      Johnson| Mrs. Oscar W (El...|
|       Nasser| Mrs. Nicholas (A...|
|    Sandstrom| Miss. Marguerite...|
|      Bonnell|     Miss. Elizabeth|
|  Saundercock|   Mr. William Henry|
|    Andersson|    Mr. Anders Johan|
|      Vestrom| Miss. Hulda Aman...|
|      Hewlett| Mrs. (Mary D Kin...|
|         Rice|      Master. Eugene|
|     Williams|  Mr. Charles Eugene|
|Vander Planke| Mrs. Julius (Eme...|
|   Masselmani|         Mrs. Fatima|
+-------------+--------------------+
only showing top 20 rows



In [18]:
titanic_sdf_copy = (titanic_sdf_copy
        .withColumn('first_name', F.split('Name', ',').getItem(0))
        .withColumn('last_name', F.split('Name', ',').getItem(1)))
titanic_sdf_copy.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+----+-----+--------+----------+-------------+----------+--------------------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|Fare|Cabin|Embarked|Extra_Fare|Cabin_Section|first_name|           last_name|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+----+-----+--------+----------+-------------+----------+--------------------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7| null|       S|        10|         null|    Braund|     Mr. Owen Harris|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|  71|  C85|       C|        10|            C|   Cumings| Mrs. John Bradle...|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|   7| null|       S|        10|         null| Heikkinen|        

In [19]:
# 컬럼 이름 변경 - 앞에 없는 이름 넣어도 에러 안남.
titanic_sdf_copy.withColumnRenamed('Fare', '요금').printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- 요금: integer (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- Extra_Fare: integer (nullable = false)
 |-- Cabin_Section: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- last_name: string (nullable = true)



In [20]:
spark.stop()