# 1. 파이스파크와 판다스 연계하기 

## 판다스 모듈

In [1]:
import pandas as pd
import numpy as np

## 파이스파크에서 판다스 api 활용 

- pyspark.pandas 모듈은 PySpark와 Pandas를 통합하는 데 사용되는 도구입니다.
- 이 모듈은 PySpark의 DataFrame과 Pandas의 DataFrame 간의 변환을 용이하게 하고, PySpark에서 Pandas 함수를 실행하고 Pandas에서 PySpark 함수를 실행할 수 있게 해줍니다.

## 주요 특징 

### 상호 운용성(Interoperability):
- PySpark와 Pandas는 각자의 장점과 한계를 갖고 있습니다. 
- PySpark는 대규모 데이터 처리와 분산 컴퓨팅에 뛰어나지만, 작은 데이터셋에 대한 데이터 조작은 Pandas가 더 편리합니다. 
- pyspark.pandas 모듈은 이러한 두 환경 간의 데이터 이동과 변환을 원활하게 처리하여 개발자가 더 편리하게 작업할 수 있도록 합니다.

### Pandas 기능의 활용: 
- Pandas는 데이터 조작 및 분석에 매우 풍부한 기능을 제공합니다. 
- pyspark.pandas를 사용하면 PySpark 데이터프레임에서 Pandas 함수를 사용할 수 있으므로, Pandas의 다양한 기능을 활용하여 데이터를 탐색하고 가공할 수 있습니다.

### 대화형 분석(Interactive Analysis): 
- 데이터 분석 작업에서는 대화형 분석이 중요합니다. 
- pyspark.pandas는 PySpark 데이터프레임을 Pandas 데이터프레임으로 변환하여 개발자가 대화형으로 데이터를 탐색하고 시각화하는 데 도움을 줍니다.

### 문제 해결 및 튜닝(Troubleshooting and Tuning): 
- 데이터 처리 및 분석 작업에서 성능 문제를 해결하거나 튜닝할 때 pyspark.pandas를 사용하여 데이터를 더 쉽게 검사하고 디버깅할 수 있습니다.

## 파이스파크에서 판다스 모듈 사용하기 

In [3]:
import pyspark.pandas as ps

## 스파크 세션 처리 

In [4]:
from pyspark.sql import SparkSession

In [6]:
spark = (SparkSession.builder
           .master("local[*]")
           .config("spark.driver.host","127.0.0.1") 
           .config("spark.driver.bindAddress","127.0.0.1")
           .appName("PySpark_basic_2")
           .getOrCreate())

23/09/13 02:29:59 WARN SparkContext: Another SparkContext is being constructed (or threw an exception in its constructor). This may indicate an error, since only one SparkContext should be running in this JVM (see SPARK-2243). The other SparkContext was created at:
org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
py4j.Gateway.invoke(Gateway.java:238)
py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
py4j.commands.Con

In [7]:
spark

## 1-1 판다스와 파이스파크의 판다스 모듈 비교 :시리즈 클래스 

### 시리즈 객체 생성 : 파이스파크 판다스 

In [8]:
ps_s = ps.Series([1, 3, 5, np.nan, 6, 8])

In [9]:
ps_s

                                                                                

0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64

### 시리즈 객체 생성 : 판다스 

In [10]:
s = pd.Series([1, 3, 5, np.nan, 6, 8])

In [11]:
s

0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64

### 두 객체의 자료형을 확인하기 

In [12]:
type(ps_s), type(s)

(pyspark.pandas.series.Series, pandas.core.series.Series)

### 주요 메서드 처리하기 

In [13]:
s.value_counts()

1.0    1
3.0    1
5.0    1
6.0    1
8.0    1
dtype: int64

In [14]:
ps_s.value_counts()

1.0    1
3.0    1
5.0    1
6.0    1
8.0    1
dtype: int64

In [15]:
ss = ps_s.to_pandas()



In [16]:
type(ss)

pandas.core.series.Series

## 1-2  판다스와 파이스파크의 판다스 모듈 비교 : 데이터프레임 

## 판다스 데이터프레임 

In [17]:
df = pd.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
    index=[10, 20, 30, 40, 50, 60])

## 파이스파크 판다스의 데이터프레임 

In [18]:
ps_df = ps.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
    index=[10, 20, 30, 40, 50, 60])

### 자료형 확인 

In [19]:
type(ps_df), type(df)

(pyspark.pandas.frame.DataFrame, pandas.core.frame.DataFrame)

### 데이터 확인 

In [20]:
ps_df

Unnamed: 0,a,b,c
10,1,100,one
20,2,200,two
30,3,300,three
40,4,400,four
50,5,500,five
60,6,600,six


In [21]:
df

Unnamed: 0,a,b,c
10,1,100,one
20,2,200,two
30,3,300,three
40,4,400,four
50,5,500,five
60,6,600,six


### 파이썬 판다스로 변환 

In [22]:
ps_df.to_pandas()



Unnamed: 0,a,b,c
10,1,100,one
20,2,200,two
30,3,300,three
40,4,400,four
50,5,500,five
60,6,600,six


## 1-3 판다스에서 파이스파크 내의 판다스로 변환

### 판다스로 날짜 데이터 범위 처리 

In [23]:
dates = pd.date_range('20130101', periods=6)

In [24]:
dates

DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04',
               '2013-01-05', '2013-01-06'],
              dtype='datetime64[ns]', freq='D')

### 판다스 데이터프레임 생성할 때 인덱스와 칼럼 지정하기 

In [25]:
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))

In [26]:
type(pdf)

pandas.core.frame.DataFrame

In [27]:
pdf

Unnamed: 0,A,B,C,D
2013-01-01,-0.227944,0.41305,1.156967,0.373436
2013-01-02,1.803409,1.104533,0.039567,0.594882
2013-01-03,0.977497,-1.369193,0.191413,-0.465368
2013-01-04,2.784228,-0.617891,-0.013584,1.783281
2013-01-05,-0.393797,1.392801,0.596686,1.64975
2013-01-06,-0.048635,0.640985,-0.527909,-1.075586


### 날짜 데이터 타입에서 에러가 발생해서 문자열로 변경 

In [28]:
pdf.index.dtype

dtype('<M8[ns]')

In [29]:
pdf.index =  pdf.index.astype('str')

In [30]:
pdf.index.dtype

dtype('O')

### 판다스에서 스파크 내의 판다스로 변환하기 

In [31]:
psdf = ps.from_pandas(pdf)

In [32]:
type(psdf)

pyspark.pandas.frame.DataFrame

In [33]:
psdf.head()

Unnamed: 0,A,B,C,D
2013-01-01,-0.227944,0.41305,1.156967,0.373436
2013-01-02,1.803409,1.104533,0.039567,0.594882
2013-01-03,0.977497,-1.369193,0.191413,-0.465368
2013-01-04,2.784228,-0.617891,-0.013584,1.783281
2013-01-05,-0.393797,1.392801,0.596686,1.64975


## 1-4 판다스를 파이스파크로 바로 변경 

### 파이스파크 데이터프레임으로 변환

In [34]:
sdf = spark.createDataFrame(pdf)

In [35]:
sdf.printSchema()

root
 |-- A: double (nullable = true)
 |-- B: double (nullable = true)
 |-- C: double (nullable = true)
 |-- D: double (nullable = true)



In [36]:
sdf.show()

+--------------------+-------------------+--------------------+-------------------+
|                   A|                  B|                   C|                  D|
+--------------------+-------------------+--------------------+-------------------+
| -0.2279440366763597|0.41305036828061065|  1.1569673263685598|  0.373436281134331|
|  1.8034092605543701| 1.1045327742556277| 0.03956720817080143| 0.5948815838198223|
|  0.9774967524542935|-1.3691926436739106|  0.1914130725050146|-0.4653681503642947|
|  2.7842275941122785| -0.617891020670731|-0.01358427784666...| 1.7832808317919835|
|-0.39379710878762786| 1.3928012113129375|  0.5966856311242036|  1.649749749492562|
|-0.04863485287348672|  0.640985168832615| -0.5279093079040785|-1.0755855186683685|
+--------------------+-------------------+--------------------+-------------------+



### 파이스파크의 판다스로 변환

In [37]:
psdf1 = sdf.pandas_api()

In [40]:
type(psdf1)

pyspark.pandas.frame.DataFrame

In [38]:
psdf1.head()

Unnamed: 0,A,B,C,D
0,-0.227944,0.41305,1.156967,0.373436
1,1.803409,1.104533,0.039567,0.594882
2,0.977497,-1.369193,0.191413,-0.465368
3,2.784228,-0.617891,-0.013584,1.783281
4,-0.393797,1.392801,0.596686,1.64975


In [39]:
psdf1.dtypes

A    float64
B    float64
C    float64
D    float64
dtype: object

### 파이스파크 내의 판다스 변환 

In [43]:
ps_df = sdf.to_pandas_on_spark()

In [44]:
type(ps_df)

pyspark.pandas.frame.DataFrame

### 판다스로 변환하기 

In [45]:
pd_df = sdf.toPandas()

In [46]:
type(pd_df)

pandas.core.frame.DataFrame