### Join
+ 여러 테이블을 연결해서 데이터를 검색하는 것
+ 테이블을 결합기준은 각 테이블에 존재하는 공통 속성임
+ 결합 유형 : inner join, outer join, self join
+ 데이터객체명.join(조인대상, 조인조건, 조인유형)

In [1]:
emp = spark.read.csv('csv/employees.csv', header=True, inferSchema=True)
emp.show(5)

+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|          HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+-------------+
|        100|    Steven|     King|   SKING|515.123.4567|2003-06-17 00:00:00|AD_PRES| 24000|          null|      null|           90|
|        101|     Neena|  Kochhar|NKOCHHAR|515.123.4568|2005-09-21 00:00:00|  AD_VP| 17000|          null|       100|           90|
|        102|       Lex|  De Haan| LDEHAAN|515.123.4569|2001-01-13 00:00:00|  AD_VP| 17000|          null|       100|           90|
|        103| Alexander|   Hunold| AHUNOLD|590.423.4567|2006-01-03 00:00:00|IT_PROG|  9000|          null|       102|           60|
|        104|     Bruce|    Ernst|  BERNST|590.423.4568|2007-05-21 00:00:00|

In [2]:
dept = spark.read.csv('csv/departments.csv', header=True, inferSchema=True)
dept.show(5)

+-------------+---------------+----------+-----------+
|DEPARTMENT_ID|DEPARTMENT_NAME|MANAGER_ID|LOCATION_ID|
+-------------+---------------+----------+-----------+
|           10| Administration|       200|       1700|
|           20|      Marketing|       201|       1800|
|           30|     Purchasing|       114|       1700|
|           40|Human Resources|       203|       2400|
|           50|       Shipping|       121|       1500|
+-------------+---------------+----------+-----------+
only showing top 5 rows



In [3]:
# 내부조인: 공통속성명이 다를 경우
join_condition = emp.DEPARTMENT_ID == dept.DEPARTMENT_ID
empdept = emp.join(dept, join_condition, 'inner')
empdept.show(5)

+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+-------------+-------------+---------------+----------+-----------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|          HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|DEPARTMENT_ID|DEPARTMENT_NAME|MANAGER_ID|LOCATION_ID|
+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+-------------+-------------+---------------+----------+-----------+
|        100|    Steven|     King|   SKING|515.123.4567|2003-06-17 00:00:00|AD_PRES| 24000|          null|      null|           90|           90|      Executive|       100|       1700|
|        101|     Neena|  Kochhar|NKOCHHAR|515.123.4568|2005-09-21 00:00:00|  AD_VP| 17000|          null|       100|           90|           90|      Executive|       100|       1700|
|        102|       Lex|  De Haan| LDEHAAN|515.123.4569|2001-01-13 00:00:00

In [4]:
# 내부조인: 공통속성명이 동일한 경우
empdept = emp.join(dept, 'DEPARTMENT_ID', 'inner')
empdept.show(5)

+-------------+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+---------------+----------+-----------+
|DEPARTMENT_ID|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|          HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_NAME|MANAGER_ID|LOCATION_ID|
+-------------+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+---------------+----------+-----------+
|           90|        100|    Steven|     King|   SKING|515.123.4567|2003-06-17 00:00:00|AD_PRES| 24000|          null|      null|      Executive|       100|       1700|
|           90|        101|     Neena|  Kochhar|NKOCHHAR|515.123.4568|2005-09-21 00:00:00|  AD_VP| 17000|          null|       100|      Executive|       100|       1700|
|           90|        102|       Lex|  De Haan| LDEHAAN|515.123.4569|2001-01-13 00:00:00|  AD_VP| 17000|          null|       100|      Executiv

In [5]:
# 사원의 이름과 부서명을 조회하세요
empdept.select('FIRST_NAME','DEPARTMENT_NAME').show(5)

+----------+---------------+
|FIRST_NAME|DEPARTMENT_NAME|
+----------+---------------+
|    Steven|      Executive|
|     Neena|      Executive|
|       Lex|      Executive|
| Alexander|             IT|
|     Bruce|             IT|
+----------+---------------+
only showing top 5 rows



In [6]:
# 외부조인
empdept2 = emp.join(dept, 'DEPARTMENT_ID', 'outer')
empdept2.show(5)

+-------------+-----------+----------+---------+------+------------------+-------------------+------+------+--------------+----------+------------------+----------+-----------+
|DEPARTMENT_ID|EMPLOYEE_ID|FIRST_NAME|LAST_NAME| EMAIL|      PHONE_NUMBER|          HIRE_DATE|JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|   DEPARTMENT_NAME|MANAGER_ID|LOCATION_ID|
+-------------+-----------+----------+---------+------+------------------+-------------------+------+------+--------------+----------+------------------+----------+-----------+
|          210|       null|      null|     null|  null|              null|               null|  null|  null|          null|      null|        IT Support|      null|       1700|
|          230|       null|      null|     null|  null|              null|               null|  null|  null|          null|      null|       IT Helpdesk|      null|       1700|
|          190|       null|      null|     null|  null|              null|               null|  null|  null|       

                                                                                

In [7]:
# 사원이 한명도 없는 부서를 조회하세요
empdept2.filter(empdept2.FIRST_NAME.isNull()).select('DEPARTMENT_ID','DEPARTMENT_NAME').show()

+-------------+--------------------+
|DEPARTMENT_ID|     DEPARTMENT_NAME|
+-------------+--------------------+
|          210|          IT Support|
|          230|         IT Helpdesk|
|          190|         Contracting|
|          140|  Control And Credit|
|          250|        Retail Sales|
|          120|            Treasury|
|          220|                 NOC|
|          130|       Corporate Tax|
|          240|    Government Sales|
|          160|            Benefits|
|          200|          Operations|
|          170|       Manufacturing|
|          150|Shareholder Services|
|          260|          Recruiting|
|          270|             Payroll|
|          180|        Construction|
+-------------+--------------------+



In [8]:
# 부서에 소속되지 않은 사원을 조회하세요
empdept2.filter(empdept2.DEPARTMENT_ID.isNull()).select('FIRST_NAME', 'LAST_NAME').show()

+----------+---------+
|FIRST_NAME|LAST_NAME|
+----------+---------+
| Kimberely|    Grant|
+----------+---------+



### 다른거

In [9]:
customers = spark.read.csv('csv/Customers.csv',header=True, inferSchema=True)
orders = spark.read.csv('csv/Orders.csv',header=True, inferSchema=True)
products = spark.read.csv('csv/Products.csv',header=True, inferSchema=True)

In [10]:
# 당근 carrot 고객이 주문한 상품의 가격은 무엇인지 조회하세요
prododr = orders.join(products, 'prodid','inner')
prododr.filter(prododr.userid == 'carrot').select('prodname','price').show()

+--------+-----+
|prodname|price|
+--------+-----+
|매운쫄면| 5500|
|쿵떡파이| 2600|
+--------+-----+



In [11]:
# 주문을 한번도 하지 않는 고객의 이름. 등급을 조회하세요
custodr = orders.join(customers, 'userid','right_outer')
custodr.filter(custodr.orderid.isNull()).select('userid','name','grade').show()

+------+------+------+
|userid|  name| grade|
+------+------+------+
|orange|김용욱|silver|
| peach|오형준|silver|
+------+------+------+



In [12]:
# 주문이 한번도 하지 않는 고객이름, 제조업체를 조회하세요.
prododr2 = orders.join(products, 'prodid','right_outer')
prododr2.filter(prododr2.orderid.isNull()).select('prodname','maker').show()

+----------+--------+
|  prodname|   maker|
+----------+--------+
|  얼큰라면|대한식품|
|달콤비스켓|한빛제과|
+----------+--------+



### 각 사원들의 상사(사번과 이름)을 조회하세요.
### 같은 이름의 테이블을 경합해서 원하는 데이터를 조회하는것

In [13]:
from pyspark.sql.functions import col
emp = spark.read.csv('csv/employees.csv', header=True, inferSchema=True)

empmgr = emp.alias('emp').join(emp.alias('mgr'),col('emp.MANAGER_ID') == col('mgr.EMPLOYEE_ID'),'inner')
empmgr.show(5)

+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+-------------+-----------+----------+---------+-------+------------+-------------------+-------+------+--------------+----------+-------------+
|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|   EMAIL|PHONE_NUMBER|          HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|EMPLOYEE_ID|FIRST_NAME|LAST_NAME|  EMAIL|PHONE_NUMBER|          HIRE_DATE| JOB_ID|SALARY|COMMISSION_PCT|MANAGER_ID|DEPARTMENT_ID|
+-----------+----------+---------+--------+------------+-------------------+-------+------+--------------+----------+-------------+-----------+----------+---------+-------+------------+-------------------+-------+------+--------------+----------+-------------+
|        101|     Neena|  Kochhar|NKOCHHAR|515.123.4568|2005-09-21 00:00:00|  AD_VP| 17000|          null|       100|           90|        100|    Steven|     King|  SKING|515.123.4567|2003-06-17 00:00:00|AD_PRES| 240

In [14]:
empmgr.select('emp.EMPLOYEE_ID' , 'emp.FIRST_NAME' , 'mgr.EMPLOYEE_ID','mgr.FIRST_NAME').show()

+-----------+-----------+-----------+----------+
|EMPLOYEE_ID| FIRST_NAME|EMPLOYEE_ID|FIRST_NAME|
+-----------+-----------+-----------+----------+
|        101|      Neena|        100|    Steven|
|        102|        Lex|        100|    Steven|
|        103|  Alexander|        102|       Lex|
|        104|      Bruce|        103| Alexander|
|        105|      David|        103| Alexander|
|        106|      Valli|        103| Alexander|
|        107|      Diana|        103| Alexander|
|        108|      Nancy|        101|     Neena|
|        109|     Daniel|        108|     Nancy|
|        110|       John|        108|     Nancy|
|        111|     Ismael|        108|     Nancy|
|        112|Jose Manuel|        108|     Nancy|
|        113|       Luis|        108|     Nancy|
|        114|        Den|        100|    Steven|
|        115|  Alexander|        114|       Den|
|        116|     Shelli|        114|       Den|
|        117|      Sigal|        114|       Den|
|        118|       

### Spark SQL
- 스파크 데이터프레임에 저장된 데이터들을 SQL 문법을 이용해서 탐색할 수 있도록 해줌
- spark.sql() 함수 사용
- OLTP 보다는 OLAP 처리에 적합

In [15]:
# 스파크 SQL을 위한 스파크 세션 생성
spark = SparkSession.builder.master('app').appName('sparkSQL').getOrCreate()

In [16]:
# SQL 사용을 위한 View 객체 생성
EMP = emp.createOrReplaceTempView("EMP")

In [17]:
# sql 함수를 이용해서 SQL 질의문 실행하고 결과 출력
spark.sql('select FIRST_NAME, LAST_NAME from EMP').show(5)

+----------+---------+
|FIRST_NAME|LAST_NAME|
+----------+---------+
|    Steven|     King|
|     Neena|  Kochhar|
|       Lex|  De Haan|
| Alexander|   Hunold|
|     Bruce|    Ernst|
+----------+---------+
only showing top 5 rows



21/11/06 09:34:35 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


### Spark_DSL_SQL

In [18]:
customers = spark.read.csv('csv/Customers.csv',header=True, inferSchema=True)
orders = spark.read.csv('csv/Orders.csv',header=True, inferSchema=True)
products = spark.read.csv('csv/Products.csv',header=True, inferSchema=True)

In [19]:
spark = SparkSession.builder.master('app').appName('sparkSQL').getOrCreate()

In [20]:
# View 로 줄이기
cust = customers.createOrReplaceTempView("cust")
orde = orders.createOrReplaceTempView("orde")
prod = products.createOrReplaceTempView("prod")

In [21]:
# Join 설정
## customers + orders
custorder = customers.join(orders, 'userid', 'inner')
## orders + products
oderprod = orders.join(products,'prodid','inner')
## customers + orders + products
custordeprod = custorder.join(products, 'prodid', 'inner')

# View 설정
cuor = custorder.createOrReplaceTempView("cuor")
orpr = oderprod.createOrReplaceTempView("orpr")
cuorpr = custordeprod.createOrReplaceTempView("cuorpr")

### SQL 사용하는 View
+ create view 뷰이름 as SQL질의문
 - create view prodorder as select * from products p inner join orders o using(prodid);
 - select * form prodorder;

### 예제

In [22]:
# 판매 데이터베이스에서 banana 고객이 주문한 제품의 이름을 검색해 보자
spark.sql('select * from cust where userid = "banana"').show()

+------+------+---+-----+------+-----+
|userid|  name|age|grade|   job|coins|
+------+------+---+-----+------+-----+
|banana|김선우| 25|  vip|간호사| 2500|
+------+------+---+-----+------+-----+



In [23]:
# 판매 데이터베이스에서 나이가 30세 이상인 고객이 주문한 제품의 주문제품과 주문일자를 검색해 보자
sql = """ select userid, name, age, substring(orddate,0,10) oderdate, prodname from cust c inner join orde o using(userid) inner join prod p using(prodid) where age >= 30 """
spark.sql(sql).show()

+------+------+---+----------+--------+
|userid|  name|age|  oderdate|prodname|
+------+------+---+----------+--------+
| melon|성원용| 35|2013-01-10|그냥만두|
| melon|성원용| 35|2013-02-20|통통우동|
|  pear|채광주| 31|2013-04-10|매운쫄면|
+------+------+---+----------+--------+



In [24]:
sql = """ select userid, name, age, substring(orddate,0,10) oderdate, prodname from cuorpr where age >= 30 """
spark.sql(sql).show()

+------+------+---+----------+--------+
|userid|  name|age|  oderdate|prodname|
+------+------+---+----------+--------+
| melon|성원용| 35|2013-01-10|그냥만두|
| melon|성원용| 35|2013-02-20|통통우동|
|  pear|채광주| 31|2013-04-10|매운쫄면|
+------+------+---+----------+--------+



In [25]:
#판매 데이터베이스 에서 고명석 고객이 주문한 제품의 제품명을 검색해보자
sql = """ select o.userid, c.name, o.orddate, p.prodname from prod p inner join orde o using(prodid) inner join cust c using(userid) where c.name='고명석' """
spark.sql(sql).show()

+------+------+-------------------+--------+
|userid|  name|            orddate|prodname|
+------+------+-------------------+--------+
|carrot|고명석|2013-02-01 00:00:00|매운쫄면|
|carrot|고명석|2013-05-22 00:00:00|쿵떡파이|
+------+------+-------------------+--------+



In [26]:
sql = """ select userid, name, orddate, prodname from cuorpr where name='고명석' """
spark.sql(sql).show()

+------+------+-------------------+--------+
|userid|  name|            orddate|prodname|
+------+------+-------------------+--------+
|carrot|고명석|2013-02-01 00:00:00|매운쫄면|
|carrot|고명석|2013-05-22 00:00:00|쿵떡파이|
+------+------+-------------------+--------+



### 평균(avg), 합계(sum), count, groupBy sum

In [27]:
from pyspark.sql.functions import round,avg,sum
products.select( round(avg(products.price),2).alias('avg price')).show()

+---------+
|avg price|
+---------+
|  2764.29|
+---------+



In [28]:
sql = """ select round(avg(price),2) avgprice from prod """
spark.sql(sql).show()

+--------+
|avgprice|
+--------+
| 2764.29|
+--------+



In [29]:
products.filter(products.maker == '한빛제과').select(sum(products.stock)).show()

+----------+
|sum(stock)|
+----------+
|      6500|
+----------+



In [30]:
sql = """ select sum(stock) allstock from prod where maker='한빛제과' """
spark.sql(sql).show()

+--------+
|allstock|
+--------+
|    6500|
+--------+



In [31]:
from pyspark.sql.functions import count, col, countDistinct

In [32]:
customers.select(count(col('name')).alias('cnt')).show()

+---+
|cnt|
+---+
|  7|
+---+



In [33]:
sql = """ select count('name') cnt from cust """
spark.sql(sql).show()

+---+
|cnt|
+---+
|  7|
+---+



In [34]:
products.select(col('maker')).distinct().count()

                                                                                

3

In [35]:
sql = """ select count(distinct maker) cnt from prod """
spark.sql(sql).show()

+---+
|cnt|
+---+
|  3|
+---+



                                                                                

In [36]:
## 주문 제품별 주문 합계
orders.groupBy('prodid').agg(sum('amount')).orderBy('prodid').show()

+------+-----------+
|prodid|sum(amount)|
+------+-----------+
|   p01|         24|
|   p02|         58|
|   p03|         52|
|   p04|         15|
|   p06|         81|
+------+-----------+



In [37]:
sql = """ select prodid,sum(amount) from orde group by prodid order by prodid """
spark.sql(sql).show()

+------+-----------+
|prodid|sum(amount)|
+------+-----------+
|   p01|         24|
|   p02|         58|
|   p03|         52|
|   p04|         15|
|   p06|         81|
+------+-----------+

