### [PySpark API 도큐먼트](https://spark.apache.org/docs/latest/api/python/index.html)

# **pyspark 패키지를 활용한 Spark 프로그래밍(2)**
## SparkSession 객체 생성

In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[2]") \
                    .appName('sparkedu') \
                    .getOrCreate()
spark
#spark.stop()

## <span style='color:red'>**RDD**</span>
### Resilient Distributed Dataset의 약자(탄력 분산 데이터셋)
### 분산되어 존재하는 데이터들의 모임, 즉 클러스터에 분배되어 있는 데이터들을 하나로 관리하는 개념
### 스파크의 모든 데이터 타입들은 RDD를 기반으로 만들어지고 데이터끼리의 연산들은 RDD의 연산으로 이루어져 있음

In [2]:
greetRDD = spark.sparkContext.textFile('data/greeting.txt')
print(greetRDD)
greetRDD.collect()

data/greeting.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0


['Good Morning',
 'Good Evening',
 'Good Day',
 'Happy Birthday',
 'Happy New Year']

In [3]:
goodLines = greetRDD.filter(lambda x : "Good" in x)
goodLines.collect()

['Good Morning', 'Good Evening', 'Good Day']

In [4]:
goodLines.count()

3

In [5]:
numbers = spark.sparkContext.parallelize(list(range(5)))
squared = numbers.map(lambda x : x * x).collect()
squared

[0, 1, 4, 9, 16]

In [6]:
strings = spark.sparkContext.parallelize(["hello spark", "hi python"])
splitted = strings.flatMap(lambda x : x.split(" ")).collect()
splitted

['hello', 'spark', 'hi', 'python']

In [7]:
numbers = spark.sparkContext.parallelize(list(range(1, 30, 3)))
result = numbers.filter(lambda x : x % 2 == 0).collect()
result

[4, 10, 16, 22, 28]

In [8]:
linesRDD = spark.sparkContext.parallelize(["test", "this is a test rdd"])
linesRDD

ParallelCollectionRDD[10] at readRDDFromFile at PythonRDD.scala:262

## <span style='color:red'>**페어 RDD**</span>
### 페어 RDD란 key-value쌍으로 이루어진 RDD
### 파이썬에서는 Tuple로 이뤄진 RDD가 곧 페어 RDD가 됨

In [9]:
examplePairRDD = spark.sparkContext.parallelize([(1, 3), (1, 5), (2, 4), (3, 3), (4, 8), (4, 2), (3, 1)])
print(examplePairRDD)
examplePairRDD.collect()

ParallelCollectionRDD[11] at readRDDFromFile at PythonRDD.scala:262


[(1, 3), (1, 5), (2, 4), (3, 3), (4, 8), (4, 2), (3, 1)]

- reduceByKey(func) : 동일 키에 대한 값들을 reduce(예 : rdd.reduceByKey(lambda x, y: x + y))
- mapValues(func) : 각 키에 대해 연산을 적용(예 : rdd.mapValues(lambda x : x + 1))
- sortByKey() : 키로 정렬한 RDD 리턴(예 : rdd.sortByKey())
- keys() : 키값들을 리턴(예 : rdd.keys())
- values() : value값들을 리턴(예 : rdd.values())

In [10]:
examplePairRDD.reduceByKey(lambda x, y : x + y).collect()

[(2, 4), (4, 10), (1, 8), (3, 4)]

In [11]:
examplePairRDD.mapValues(lambda x: x**2).collect()

[(1, 9), (1, 25), (2, 16), (3, 9), (4, 64), (4, 4), (3, 1)]

In [13]:
customerLines = spark.sparkContext.textFile("data/name-customers.csv")
customerLines.first()

Py4JJavaError: An error occurred while calling o136.partitions.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:/D:/PYTHON기반_빅데이터처리/PyStexam/data/name-customers.csv
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:205)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:276)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:272)
	at org.apache.spark.api.java.JavaRDDLike.partitions(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.JavaRDDLike.partitions$(JavaRDDLike.scala:61)
	at org.apache.spark.api.java.AbstractJavaRDDLike.partitions(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
customerPairs = customerLines.map(lambda x: (x.split(",")[1], x.split(",")[0]))
customerPairs

In [None]:
customerPairCollected = customerPairs.groupByKey().collect()
customerDict = {
    country : [c for c in customers]
    for country, customers in customerPairCollected
}
customerDict['UK']

In [None]:
[k for k in customerPairs.sortByKey().keys().collect()][:10]

In [None]:
mapReduced = customerPairs.mapValues(lambda x : 1).reduceByKey(lambda x, y: x + y)
{
    i:j for i, j in mapReduced.collect()
}

## RDD를 가지고 워드카운팅하는 예제

In [14]:
lines = spark.sparkContext.textFile("data/greeting.txt")
sorted(lines.flatMap(lambda line: line.split()).map(lambda w: (w,1)).reduceByKey(lambda v1, v2: v1+v2).collect())

[('Birthday', 1),
 ('Day', 1),
 ('Evening', 1),
 ('Good', 3),
 ('Happy', 2),
 ('Morning', 1),
 ('New', 1),
 ('Year', 1)]

In [15]:
rdd1 = spark.sparkContext.textFile("data/greeting.txt")
print(type(rdd1))
print(rdd1)
print(rdd1.collect())
print("------------------------------------------------------------------------------")
rdd2 = rdd1.flatMap(lambda line: line.split())
print(type(rdd2))
print(rdd2)
print(rdd2.collect())
print("------------------------------------------------------------------------------")
rdd3 = rdd2.map(lambda w: (w,1))
print(type(rdd3))
print(rdd3)      
print(rdd3.collect())
print("------------------------------------------------------------------------------")
rdd4 = rdd3.reduceByKey(lambda v1, v2: v1+v2)
print(type(rdd4))
print(rdd4)
print(rdd4.collect())
print("------------------------------------------------------------------------------")
result = rdd4.collect()
print(type(result))
print(result)
print("------------------------------------------------------------------------------")
print(sorted(result))

<class 'pyspark.rdd.RDD'>
data/greeting.txt MapPartitionsRDD[30] at textFile at NativeMethodAccessorImpl.java:0
['Good Morning', 'Good Evening', 'Good Day', 'Happy Birthday', 'Happy New Year']
------------------------------------------------------------------------------
<class 'pyspark.rdd.PipelinedRDD'>
PythonRDD[31] at RDD at PythonRDD.scala:53
['Good', 'Morning', 'Good', 'Evening', 'Good', 'Day', 'Happy', 'Birthday', 'Happy', 'New', 'Year']
------------------------------------------------------------------------------
<class 'pyspark.rdd.PipelinedRDD'>
PythonRDD[32] at RDD at PythonRDD.scala:53
[('Good', 1), ('Morning', 1), ('Good', 1), ('Evening', 1), ('Good', 1), ('Day', 1), ('Happy', 1), ('Birthday', 1), ('Happy', 1), ('New', 1), ('Year', 1)]
------------------------------------------------------------------------------
<class 'pyspark.rdd.PipelinedRDD'>
PythonRDD[37] at RDD at PythonRDD.scala:53
[('Good', 3), ('Morning', 1), ('Evening', 1), ('Birthday', 1), ('New', 1), ('Year',

## 파일 로딩(JSON, CSV)

In [16]:
import json
carsJson = spark.sparkContext.textFile("./data/cars.json")\
              .map(lambda x: json.loads(x))
carsJson

PythonRDD[40] at RDD at PythonRDD.scala:53

In [17]:
inputJson.first()

NameError: name 'inputJson' is not defined

In [None]:
inputJson.collect()

## RDD를 가지고 Hive가상테이블 생성 ~> SQL을 사용해서 데이터 처리

In [18]:
emp = spark.read.csv("data/emp.csv", header=True, inferSchema=True)

In [19]:
from pyspark.sql import HiveContext
hiveCtx = HiveContext(spark.sparkContext)

In [20]:
emp.registerTempTable("hiveemp")
emp

DataFrame[empno: int, ename: string, job: string, mgr: int, hiredate: string, sal: int, comm: int, deptno: int]

In [21]:
empResult = hiveCtx.sql("SELECT ename, sal FROM hiveemp")
empResult.collect()[:5]

[Row(ename='SMITH', sal=800),
 Row(ename='ALLEN', sal=1600),
 Row(ename='WARD', sal=1250),
 Row(ename='JONES', sal=2975),
 Row(ename='MARTIN', sal=1250)]

In [22]:
empResult = hiveCtx.sql("SELECT * FROM hiveemp order by sal")
empResult.collect()

[Row(empno=7369, ename='SMITH', job='CLERK', mgr=7902, hiredate='1980-12-17', sal=800, comm=None, deptno=20),
 Row(empno=7900, ename='JAMES', job='CLERK', mgr=7698, hiredate='1981-12-03', sal=950, comm=None, deptno=30),
 Row(empno=7876, ename='ADAMS', job='CLERK', mgr=7788, hiredate='1983-01-12', sal=1100, comm=None, deptno=20),
 Row(empno=7521, ename='WARD', job='SALESMAN', mgr=7698, hiredate='1981-02-03', sal=1250, comm=500, deptno=30),
 Row(empno=7654, ename='MARTIN', job='SALESMAN', mgr=7698, hiredate='1981-10-22', sal=1250, comm=1400, deptno=30),
 Row(empno=7934, ename='MILLER', job='CLERK', mgr=7782, hiredate='1982-01-25', sal=1300, comm=None, deptno=10),
 Row(empno=7844, ename='TURNER', job='SALESMAN', mgr=7698, hiredate='1984-10-08', sal=1500, comm=None, deptno=30),
 Row(empno=7499, ename='ALLEN', job='SALESMAN', mgr=7698, hiredate='1981-02-20', sal=1600, comm=300, deptno=30),
 Row(empno=7782, ename='CLARK', job='MANAGER', mgr=7839, hiredate='1981-09-06', sal=2450, comm=None, d

## RDD를 가지고 임시뷰 생성 ~> SQL을 사용해서 데이터 처리

In [23]:
emp.createOrReplaceTempView("empview")

In [24]:
sparkdf = spark.sql("select * from empview")
print(type(sparkdf))
sparkdf.show()

<class 'pyspark.sql.dataframe.DataFrame'>
+-----+------+---------+----+----------+----+----+------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7369| SMITH|    CLERK|7902|1980-12-17| 800|null|    20|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600| 300|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-03|1250| 500|    30|
| 7566| JONES|  MANAGER|7839|1981-03-02|2975|null|    20|
| 7654|MARTIN| SALESMAN|7698|1981-10-22|1250|1400|    30|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850|null|    30|
| 7782| CLARK|  MANAGER|7839|1981-09-06|2450|null|    10|
| 7788| SCOTT|  ANALYST|7566|1982-12-08|3000|null|    20|
| 7839|  KING|PRESIDENT|null|1981-11-17|5000|null|    10|
| 7844|TURNER| SALESMAN|7698|1984-10-08|1500|null|    30|
| 7876| ADAMS|    CLERK|7788|1983-01-12|1100|null|    20|
| 7900| JAMES|    CLERK|7698|1981-12-03| 950|null|    30|
| 7902|  FORD|  ANALYST|7566|1981-12-13|3000|null|    20|
| 7934|MILLER|    CLERK|7782|1

In [25]:
spark.sql("select * from empview where sal > 2000").show()

+-----+-----+---------+----+----------+----+----+------+
|empno|ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+-----+---------+----+----------+----+----+------+
| 7566|JONES|  MANAGER|7839|1981-03-02|2975|null|    20|
| 7698|BLAKE|  MANAGER|7839|1981-05-01|2850|null|    30|
| 7782|CLARK|  MANAGER|7839|1981-09-06|2450|null|    10|
| 7788|SCOTT|  ANALYST|7566|1982-12-08|3000|null|    20|
| 7839| KING|PRESIDENT|null|1981-11-17|5000|null|    10|
| 7902| FORD|  ANALYST|7566|1981-12-13|3000|null|    20|
+-----+-----+---------+----+----------+----+----+------+



In [26]:
spark.sql("select deptno, sum(sal), max(sal) from empview group by deptno").show()

+------+--------+--------+
|deptno|sum(sal)|max(sal)|
+------+--------+--------+
|    20|   10875|    3000|
|    10|    8750|    5000|
|    30|    9400|    2850|
+------+--------+--------+



In [27]:
spark.sql("select * from empview where sal > 2000").show()

+-----+-----+---------+----+----------+----+----+------+
|empno|ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+-----+---------+----+----------+----+----+------+
| 7566|JONES|  MANAGER|7839|1981-03-02|2975|null|    20|
| 7698|BLAKE|  MANAGER|7839|1981-05-01|2850|null|    30|
| 7782|CLARK|  MANAGER|7839|1981-09-06|2450|null|    10|
| 7788|SCOTT|  ANALYST|7566|1982-12-08|3000|null|    20|
| 7839| KING|PRESIDENT|null|1981-11-17|5000|null|    10|
| 7902| FORD|  ANALYST|7566|1981-12-13|3000|null|    20|
+-----+-----+---------+----+----------+----+----+------+



In [28]:
spark.sql("select * from empview order by sal desc").show()

+-----+------+---------+----+----------+----+----+------+
|empno| ename|      job| mgr|  hiredate| sal|comm|deptno|
+-----+------+---------+----+----------+----+----+------+
| 7839|  KING|PRESIDENT|null|1981-11-17|5000|null|    10|
| 7788| SCOTT|  ANALYST|7566|1982-12-08|3000|null|    20|
| 7902|  FORD|  ANALYST|7566|1981-12-13|3000|null|    20|
| 7566| JONES|  MANAGER|7839|1981-03-02|2975|null|    20|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850|null|    30|
| 7782| CLARK|  MANAGER|7839|1981-09-06|2450|null|    10|
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600| 300|    30|
| 7844|TURNER| SALESMAN|7698|1984-10-08|1500|null|    30|
| 7934|MILLER|    CLERK|7782|1982-01-25|1300|null|    10|
| 7654|MARTIN| SALESMAN|7698|1981-10-22|1250|1400|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-03|1250| 500|    30|
| 7876| ADAMS|    CLERK|7788|1983-01-12|1100|null|    20|
| 7900| JAMES|    CLERK|7698|1981-12-03| 950|null|    30|
| 7369| SMITH|    CLERK|7902|1980-12-17| 800|null|    20|
+-----+------+

In [29]:
spark.sql("select * from empview order by sal desc").take(1)

[Row(empno=7839, ename='KING', job='PRESIDENT', mgr=None, hiredate='1981-11-17', sal=5000, comm=None, deptno=10)]

In [30]:
spark.sql("select * from empview order by sal desc").take(1)[0][1]

'KING'

![이미지](images/spark_df.png)

## Row 객체

In [31]:
from pyspark.sql import Row
row=Row("James",40)
print(row[0] +","+str(row[1]))

James,40


In [32]:
row=Row(name="Alice", age=11)
print(row.name)

Alice


In [33]:
Person = Row("name", "age")
p1=Person("James", 40)
p2=Person("Alice", 35)
print(p1.name +","+p2.name)

James,Alice


In [34]:
from pyspark.sql import Row

data = [Row(name="James,,Smith",lang=["Java","Scala","C++"],state="CA"), 
    Row(name="Michael,Rose,",lang=["Spark","Java","C++"],state="NJ"),
    Row(name="Robert,,Williams",lang=["CSharp","VB"],state="NV")]
rdd=spark.sparkContext.parallelize(data)
print(rdd.collect())

[Row(name='James,,Smith', lang=['Java', 'Scala', 'C++'], state='CA'), Row(name='Michael,Rose,', lang=['Spark', 'Java', 'C++'], state='NJ'), Row(name='Robert,,Williams', lang=['CSharp', 'VB'], state='NV')]


In [35]:
collData=rdd.collect()
for row in collData:
    print(row.name + "," +str(row.lang))

James,,Smith,['Java', 'Scala', 'C++']
Michael,Rose,,['Spark', 'Java', 'C++']
Robert,,Williams,['CSharp', 'VB']


## 날짜데이터를 처리하자

In [36]:
import pyspark.sql.functions as f

In [37]:
l1 = [('2019-05-22',342),('2020-06-02',334),('2019-09-30',269),('2020-10-10',342),('2020-12-25',342)]
dfl1 =  spark.createDataFrame(l1).toDF("dates","sum")
dfl1.show()

+----------+---+
|     dates|sum|
+----------+---+
|2019-05-22|342|
|2020-06-02|334|
|2019-09-30|269|
|2020-10-10|342|
|2020-12-25|342|
+----------+---+



In [None]:
from pyspark.sql.functions import col
dfl2 = dfl1.withColumn('years',f.year(f.to_timestamp('dates', 'yyyy-MM-dd')))
dfl2 = dfl2.withColumn("month",f.month(f.to_timestamp('dates', 'yyyy-MM-dd')))
dfl2 = dfl2.withColumn("dayofmonth",f.dayofmonth(f.to_timestamp('dates', 'yyyy-MM-dd')))
dfl2.show()

In [None]:
dfl2 = dfl1.withColumn('years',f.year(f.to_timestamp('dates')))
dfl2 = dfl2.withColumn("month",f.month(f.to_timestamp('dates')))
dfl2 = dfl2.withColumn("dayofmonth",f.dayofmonth(f.to_timestamp('dates')))
dfl2.show()

In [None]:
dfl2.groupBy('years').sum('sum').show()

## NoneType 필터링
### pyspark에서 drop method는 NULL을 가진 행을 제거하는데 가장 간단한 함수다. 

### [drop 메소드에 인수]
### any: 모든 행의 컬럼값 중 하나라도 NULL의 값을 가지면 해당 행을 제거
### all: 모든 컬럼 값이 NULL이거나 NaN인 경우에만 해당 행을 제거

In [None]:
import pyspark.sql.functions as f


In [None]:
df = spark.createDataFrame([
    (1,'A','X1'),(2,None,'X2'),(2,'B','X2'),(2,'','X1'),(None,'','X3'),(1,'C','X1'),(2,None,'X1'),(2,'D',None),(None,None,None)
], ["ID", "TYPE", "CODE"])
df.show()

In [None]:
df.na.drop('any').show()

In [None]:
df.na.drop('all').show()

In [None]:
df.na.drop('all', subset=['TYPE', 'CODE']).show()

In [None]:
df.na.drop('any', subset=['TYPE', 'CODE']).show()

In [None]:
df.show()

In [None]:
from decimal import Decimal

data = [{"Category": 'Category A', "ID": 1, "Value": Decimal(12.40)},
        {"Category": 'Category B', "ID": 2, "Value": Decimal(30.10)},
        {"Category": 'Category C', "ID": 3, "Value": None},
        {"Category": 'Category D', "ID": 4, "Value": Decimal(1.0)},
        ]

# Create data frame
df = spark.createDataFrame(data)
df.show()

In [None]:
from decimal import Decimal

data = [Row(Category='Category A', ID=1, Value= Decimal(12.40)),
        Row(Category='Category B', ID=2, Value= Decimal(30.10)),
        Row(Category='Category C', ID=3, Value= None),
        Row(Category='Category D', ID=4, Value= Decimal(1.0)),
        ]

# Create data frame
df = spark.createDataFrame(data)
df.show()

In [None]:
df.filter("Value is not null").show()

In [None]:
df.where("Value is null").show()

In [None]:
df.filter(df['Value'].isNull()).show()

In [None]:
df.where(df.Value.isNotNull()).show()

## 날짜타입 데이터 처리

In [None]:
emp = spark.read.csv("data/emp.csv", header=True, inferSchema=True)

In [None]:
emp.columns

In [None]:
emp.dtypes

In [None]:
from pyspark.sql.functions import col
newemp = emp.withColumn("hiredate",col("hiredate").cast("Date"))
newemp.printSchema()

In [None]:
newemp.select(f.year(newemp["hiredate"])).show()

In [None]:
newemp.select(f.month(newemp["hiredate"])).show()

In [None]:
newemp.select(f.dayofmonth(newemp["hiredate"])).show()

### 임시뷰를 활용한 SQL 데이터 처리 복습

In [None]:
emp.createOrReplaceTempView("empview")

In [None]:
sparkdf = spark.sql("select * from empview")
print(type(sparkdf))
sparkdf.show()

In [None]:
spark.sql("select * from empview where sal > 2000").show()

In [None]:
spark.sql("select deptno, sum(sal), max(sal) from empview group by deptno").show()

In [None]:
spark.sql("select * from empview where sal > 2000").show()

In [None]:
spark.sql("select * from empview order by sal desc").show()

In [None]:
spark.sql("select * from empview order by sal desc").take(1)

In [None]:
spark.sql("select * from empview order by sal desc").take(1)[0][1]

## 날짜타입 데이터 처리

In [None]:
flightData2015 = spark\
  .read\
  .option("inferSchema", "true")\
  .option("header", "true")\
  .csv("data/flight-data/csv/2015-summary.csv")

In [None]:
flightData2015.createOrReplaceTempView("flight_data_2015")

In [None]:
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")
sqlWay.show()

In [None]:
dataFrameWay = flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .count()
dataFrameWay.show()

In [None]:
from pyspark.sql.functions import max

flightData2015.select(max("count")).take(1)

In [None]:
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")

maxSql.show()

In [None]:
from pyspark.sql.functions import desc

flightData2015\
  .groupBy("DEST_COUNTRY_NAME")\
  .sum("count")\
  .withColumnRenamed("sum(count)", "destination_total")\
  .sort(desc("destination_total"))\
  .limit(5)\
  .show()

## 다중 파일도 한방에 읽을 수 있지요...

In [None]:
staticDataFrame = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema

In [None]:
staticSchema

In [None]:
staticDataFrame.count()

In [None]:
spark.sql("select * from retail_data").show()

In [None]:
spark.sql("select * from retail_data where InvoiceDate > ''").show()

## 윈도우함수(랭킹함수) 활용

In [None]:
simpleData = (("James", "Sales", 3000), \
    ("Michael", "Sales", 4600),  \
    ("Robert", "Sales", 4100),   \
    ("Maria", "Finance", 3000),  \
    ("Scott", "Finance", 3300),  \
    ("Jen", "Finance", 3900),    \
    ("Jeff", "Marketing", 3000), \
    ("Kumar", "Marketing", 2000),\
    ("Saif", "Sales", 4100) \
  )
 
columns= ["employee_name", "department", "salary"]
df = spark.createDataFrame(data = simpleData, schema = columns)
df.printSchema()
df.show(truncate=False)

In [None]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
windowSpec  = Window.partitionBy("department").orderBy("salary")

df.withColumn("row_number",row_number().over(windowSpec)) \
    .show(truncate=False)

In [None]:
from pyspark.sql.functions import rank
df.withColumn("rank",rank().over(windowSpec)) \
    .show()

In [None]:
from pyspark.sql.functions import dense_rank
df.withColumn("dense_rank",dense_rank().over(windowSpec)) \
    .show()

## 웹사이트에서 데이터 읽어오기

In [None]:
from pyspark import SparkFiles

spark.sparkContext.addFile("https://raw.githubusercontent.com/guru99-edu/R-Programming/master/adult_data.csv")
df = spark.read.csv(SparkFiles.get("adult_data.csv"), header=True, inferSchema=True)

In [None]:
df.printSchema ()

In [None]:
df.show(5, truncate = False)

In [None]:
df.select('age','fnlwgt').show(5)

In [None]:
df.groupBy("education").count().sort("count",ascending=True).show()

In [None]:
df.describe().show()

In [None]:
df.describe('capital_gain').show()

In [None]:
df.filter(df.age > 40).count()

## 다양한 집계(aggregation) 함수들

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import approx_count_distinct,collect_list
from pyspark.sql.functions import collect_set,sum,avg,max,countDistinct,count
from pyspark.sql.functions import first, last, kurtosis, min, mean, skewness 
from pyspark.sql.functions import stddev, stddev_samp, stddev_pop, sumDistinct
from pyspark.sql.functions import variance,var_samp,  var_pop

simpleData = [
    ("Michael", "Sales", 4600),
    ("Robert", "Sales", 4100),
    ("Maria", "Finance", 3000),
    ("James", "Sales", 3000),
    ("Scott", "Finance", 3300),
    ("Jen", "Finance", 3900),
    ("Jeff", "Marketing", 3000),
    ("Kumar", "Marketing", 2000),
    ("Saif", "Sales", 4100)
  ]
schema = ["employee_name", "department", "salary"]
  
df = spark.createDataFrame(data=simpleData, schema = schema)
df.printSchema()
df.show(truncate=False)

print("approx_count_distinct: " + \
      str(df.select(approx_count_distinct("salary")).collect()[0][0]))

print("avg: " + str(df.select(avg("salary")).collect()[0][0]))

df.select(collect_list("salary")).show(truncate=False)

df.select(collect_set("salary")).show(truncate=False)

df2 = df.select(countDistinct("department", "salary"))
df2.show(truncate=False)
print("Distinct Count of Department & Salary: "+str(df2.collect()[0][0]))

print("count: "+str(df.select(count("salary")).collect()[0]))
df.select(first("salary")).show(truncate=False)
df.select(last("salary")).show(truncate=False)
df.select(kurtosis("salary")).show(truncate=False)
df.select(max("salary")).show(truncate=False)
df.select(min("salary")).show(truncate=False)
df.select(mean("salary")).show(truncate=False)
df.select(skewness("salary")).show(truncate=False)
df.select(stddev("salary"), stddev_samp("salary"), \
    stddev_pop("salary")).show(truncate=False)
df.select(sum("salary")).show(truncate=False)
df.select(sumDistinct("salary")).show(truncate=False)
df.select(variance("salary"),var_samp("salary"),var_pop("salary")) \
  .show(truncate=False)

## UDF(User Defined Function) 활용

In [None]:
emp

In [None]:
def detQuarter(sal):
    Q = 'E'
    if(sal > 4000):
        Q = 'A'
    elif(sal > 3000):
        Q = 'B'
    elif(sal > 2000):
        Q = 'C'
    elif(sal > 1000):
        Q = 'D'
    return Q

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

grade = udf(detQuarter, StringType())

In [None]:
newemp = emp.withColumn("grade", grade('sal'))
newemp.show()

In [None]:
columns = ["Seqno","Name"]
data = [("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

In [None]:
def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 

convertUDF = udf(lambda z: convertCase(z))

df.select(col("Seqno"), \
    convertUDF(col("Name")).alias("Name") ) \
.show(truncate=False)