In [1]:
# %load requirements.txt

import findspark
findspark.init()

import pyspark
print(findspark.find())

from pyspark import SparkContext, SparkConf

conf = SparkConf().setMaster("local").setAppName("Spark Practice 1 ")

sc = SparkContext(conf=conf)

from pyspark.sql import SparkSession
spark = SparkSession(sc)


/opt/spark


## 참조자료 

https://wikidocs.net/28532

# 1. 직접 데이터프레임 생성하기 

In [2]:
import numpy as np

In [3]:
hasattr(spark, "createDataFrame")

True

In [4]:
from pyspark.sql.types import FloatType, StringType

df = spark.createDataFrame([1.0, 2.0, 3.0], StringType())

In [5]:
df

DataFrame[value: string]

In [6]:
SparkSession.__dict__['createDataFrame']

<function pyspark.sql.session.SparkSession.createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=True)>

In [7]:
spark.createDataFrame

<bound method SparkSession.createDataFrame of <pyspark.sql.session.SparkSession object at 0x11a0ce1d0>>

In [8]:
spark

In [9]:
spark.createDataFrame.__self__

In [10]:
df.show()

+-----+
|value|
+-----+
|  1.0|
|  2.0|
|  3.0|
+-----+



# 2.  RDD에서 데이터프레임으로 변환하기

In [11]:
rdd = sc.parallelize([(1,2,3),(4,5,6),(7,8,9)])
df = rdd.toDF(["a","b","c"])

In [12]:
hasattr(rdd, "toDF")

True

In [13]:
dir(rdd)

['__add__',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getnewargs__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_computeFractionForSampleSize',
 '_defaultReducePartitions',
 '_id',
 '_is_barrier',
 '_jrdd',
 '_jrdd_deserializer',
 '_memory_limit',
 '_pickled',
 '_reserialize',
 '_to_java_object_rdd',
 'aggregate',
 'aggregateByKey',
 'barrier',
 'cache',
 'cartesian',
 'checkpoint',
 'coalesce',
 'cogroup',
 'collect',
 'collectAsMap',
 'combineByKey',
 'context',
 'count',
 'countApprox',
 'countApproxDistinct',
 'countByKey',
 'countByValue',
 'ctx',
 'distinct',
 'filter',
 'first',
 'flatMap',
 'flatMapValues',
 'fold',
 'foldByKey',
 'foreach',
 'foreachPartition',
 'fullOuterJoin',
 'getCheckpo

In [14]:
df.show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  7|  8|  9|
+---+---+---+



In [15]:
df.__getattribute__('show')()

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  7|  8|  9|
+---+---+---+



In [16]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: long (nullable = true)
 |-- c: long (nullable = true)



### Row 클래스를 사용해서 행 추가하기 

In [17]:
from pyspark.sql import Row
rdd = sc.parallelize([Row(a=1,b=2,c=3),Row(a=4,b=5,c=6),Row(a=7,b=8,c=9)])
df_ = rdd.toDF()

In [18]:
df_.show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  1|  2|  3|
|  4|  5|  6|
|  7|  8|  9|
+---+---+---+



In [19]:
peopleRDD = sc.parallelize( [ ("David", 150), ("White", 200), ("Paul",  170)  ])


In [20]:
peopleRDD

ParallelCollectionRDD[32] at parallelize at PythonRDD.scala:195

###  두개의 칼럼 정보를 리스트에 넣고 처리할 것 

In [21]:
peopleDF = peopleRDD.toDF( ["name", "salary" ])

In [22]:
peopleDF

DataFrame[name: string, salary: bigint]

In [23]:
peopleDF.show()

+-----+------+
| name|salary|
+-----+------+
|David|   150|
|White|   200|
| Paul|   170|
+-----+------+



# 3. RDD에  스키마 만들고 데이터프레임으로 변환하기 

In [24]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [25]:
peopleRDD = sc.parallelize([Row("David", 150),Row("White", 200),Row("Paul",  170) ] )

In [26]:
peopleSchema = StructType().add(StructField("name",   StringType(), True)).add(StructField("salary", IntegerType(), True))

In [27]:
peopleDF = spark.createDataFrame(peopleRDD, peopleSchema)

In [28]:
peopleDF.show()

+-----+------+
| name|salary|
+-----+------+
|David|   150|
|White|   200|
| Paul|   170|
+-----+------+



## 데이터 프레임의 스키마 정보 확인

df.printSchema() 로 스키마를 확인해볼 수 있다.

df.columns 로 컬럼을 확인해 볼 수 있고, 파이썬 리스트로 반환된다.

df.describe() 로 데이터프레임 내용을 확인해 볼 수 있다. 이 메서드로는 데이터프레임이 리턴되는것을 확인해볼 수 있다.

In [29]:
peopleDF.printSchema() 

root
 |-- name: string (nullable = true)
 |-- salary: integer (nullable = true)



In [30]:
peopleDF.columns

['name', 'salary']

In [31]:
peopleDF.describe()

DataFrame[summary: string, name: string, salary: string]

## JSON 파일을 만들어서 데이터프레임으로 변환하기

In [32]:
%%writefile people.json
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}

Overwriting people.json


In [33]:
df = spark.read.json("./people.json")

# df = dataframe!

df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [34]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



## 텍스트 파일을 만들어서 데이터프레임으로 변환하기

In [35]:
%%writefile people.txt
A, 29
B, 30
C, 19
D, 15
F, 20

Overwriting people.txt


In [36]:
peopleRDD = sc.textFile("./people.txt")

In [37]:
peopleRDD 

./people.txt MapPartitionsRDD[64] at textFile at NativeMethodAccessorImpl.java:0

In [38]:
peopleRDD .collect()

['A, 29', 'B, 30', 'C, 19', 'D, 15', 'F, 20']

### 스키마 정보 입력하기

In [39]:
peopleSchema = StructType().add(StructField("name",   StringType(), True)).add(StructField("age", IntegerType(), True))

### 텍스트 파일을 분리해서 Row 클래스로 변호나하기

In [40]:
sepPeopleRdd = peopleRDD.map(lambda line  :  line.split(",")).map(lambda x  :  Row(x[0], int(x[1])))

In [41]:
peopleDF = spark.createDataFrame(sepPeopleRdd, peopleSchema)

In [42]:
peopleDF.show()

+----+---+
|name|age|
+----+---+
|   A| 29|
|   B| 30|
|   C| 19|
|   D| 15|
|   F| 20|
+----+---+



In [43]:
#### df.select('age').show() 로 위 컬럼의  데이터를 가져올 수 있다.

In [56]:
peopleDF.select("name")

DataFrame[name: string]

In [67]:
peopleDF.select("age").where("age > 20").explain()

== Physical Plan ==
*(1) Project [age#210]
+- *(1) Filter (isnotnull(age#210) && (age#210 > 20))
   +- Scan ExistingRDD[name#209,age#210]


In [58]:
peopleDF.select("age").explain()

== Physical Plan ==
*(1) Project [age#210]
+- Scan ExistingRDD[name#209,age#210]


In [68]:
peopleDF.select("age").where("age > 20").show()

+---+
|age|
+---+
| 29|
| 30|
+---+



###   스칼라 언어 처리 : 

      df.select($"name", $"age" + 1).show()

In [69]:
peopleDF.select(peopleDF.name, peopleDF.age + 1).explain() 

== Physical Plan ==
*(1) Project [name#209, (age#210 + 1) AS (age + 1)#304]
+- Scan ExistingRDD[name#209,age#210]


In [70]:
peopleDF.select(peopleDF.name, peopleDF.age + 1).show()

+----+---------+
|name|(age + 1)|
+----+---------+
|   A|       30|
|   B|       31|
|   C|       20|
|   D|       16|
|   F|       21|
+----+---------+



In [46]:
### df.select(['age', 'name']) 의 식으로 다중 컬럼을 선택할 수도 있다

In [47]:
peopleDF.select(["name","age"]).show()

+----+---+
|name|age|
+----+---+
|   A| 29|
|   B| 30|
|   C| 19|
|   D| 15|
|   F| 20|
+----+---+



### df.head(2) 로 위에 있는 두 가지 row를 가져올 수 있다. df.head(2)[0] 의 식으로 파이썬에서 하던 인덱싱을 할 수도 있다. 아주 간편하다.

In [48]:
peopleDF.head()

Row(name='A', age=29)

In [49]:
peopleDF.head()[0]

'A'

In [50]:
peopleDF.head()['name']

'A'

### 필터 처리하기

In [51]:
peopleDF.filter("age> 21").show()

+----+---+
|name|age|
+----+---+
|   A| 29|
|   B| 30|
+----+---+



In [52]:
peopleDF.select(["name", "age"]).filter("age > 21").show()

+----+---+
|name|age|
+----+---+
|   A| 29|
|   B| 30|
+----+---+



## groupby 처리

In [53]:
peopleDF.groupBy("age").count().show()

+---+-----+
|age|count|
+---+-----+
| 20|    1|
| 19|    1|
| 15|    1|
| 29|    1|
| 30|    1|
+---+-----+



## 메소드들 

컬럼을 좀 더 다뤄보자.

df.withColumn("새컬럼", df['age']) 으로 새 컬럼을 만들 수 있다. withColumn() 은 아규먼트로 컬럼 이름과, sql.컬럼을 받기 때문에 주의한다.

df.withColumn("두배', df['age']*2) 와 같이 컬럼 내 데이터들을 손봐서 새 컬럼을 추가하는 데 사용할 수 있다.

df.withColumnRenamed("age", "renamed_age").show() 로 컬럼 이름을 바꾼 후 내용을 확인해볼 수 있다.

sql 모듈에서는 sql을 다루는 쉬운 메서드들이 많다.

df.createOrReplaceTempView("새로만들 테이블") 로 테이블을 만들듯 sql 임시 뷰를 만들 수 있다.

results = spark.sql("select * from 새로만들 테이블") 식으로, sql을 다루듯 사용할 수 있으므로 별다른 많은 공부를 요구하지 않는다. 아주 쉽다. 아래와 같이 제플린에서 사용한다면 될 것이다.


In [54]:
#### df['age'] 로 인덱싱을 해 보면 파이스파크 컬럼을 얻을 수 있다

In [55]:
peopleDF['name']

Column<b'name'>