In [9]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession

## 스파크 내부의 판다스 모듈 사용하기 

In [10]:
s = ps.Series([1, 3, 5, np.nan, 6, 8])

23/06/06 16:41:17 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [11]:
s

                                                                                

0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64

In [12]:
type(s)

pyspark.pandas.series.Series

In [13]:
s.value_counts()

1.0    1
3.0    1
5.0    1
6.0    1
8.0    1
dtype: int64

## 데이터프레임 처리 

In [14]:
psdf = ps.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
    index=[10, 20, 30, 40, 50, 60])

In [16]:
type(psdf)

pyspark.pandas.frame.DataFrame

In [15]:
psdf

Unnamed: 0,a,b,c
10,1,100,one
20,2,200,two
30,3,300,three
40,4,400,four
50,5,500,five
60,6,600,six


In [17]:
dates = pd.date_range('20130101', periods=6)

In [18]:
dates

DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04',
               '2013-01-05', '2013-01-06'],
              dtype='datetime64[ns]', freq='D')

## 판다스에서 파이스파크 내의 판다스로 변환

In [19]:
pdf = pd.DataFrame(np.random.randn(6, 4), index=dates, columns=list('ABCD'))

In [21]:
type(pdf)

pandas.core.frame.DataFrame

In [20]:
pdf

Unnamed: 0,A,B,C,D
2013-01-01,-0.380845,0.586259,0.245217,-1.132811
2013-01-02,-0.798265,-0.619576,1.030725,0.256005
2013-01-03,-0.790509,0.825487,1.391985,1.186493
2013-01-04,0.034183,-1.773757,0.512838,-0.971689
2013-01-05,-0.723982,0.369871,-0.718353,0.713086
2013-01-06,-0.801117,-1.551758,0.33607,0.800248


### 날짜 데이터 타입에서 에러가 발생해서 문자열로 변경 

In [26]:
pdf.index.dtype

dtype('<M8[ns]')

In [32]:
pdf.index =  pdf.index.astype('str')

In [33]:
pdf.index.dtype

dtype('O')

In [34]:
psdf = ps.from_pandas(pdf)

In [35]:
type(psdf)

pyspark.pandas.frame.DataFrame

In [36]:
psdf.head()

Unnamed: 0,A,B,C,D
2013-01-01,-0.380845,0.586259,0.245217,-1.132811
2013-01-02,-0.798265,-0.619576,1.030725,0.256005
2013-01-03,-0.790509,0.825487,1.391985,1.186493
2013-01-04,0.034183,-1.773757,0.512838,-0.971689
2013-01-05,-0.723982,0.369871,-0.718353,0.713086


## 판다스를 파이스파크로 바로 변경 

In [37]:
spark = (SparkSession.builder.appName('ml-bank')
                             .config("spark.driver.host","127.0.0.1") 
                             .config("spark.driver.bindAddress","127.0.0.1")
                             .getOrCreate())


23/06/06 17:53:28 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [38]:
sdf = spark.createDataFrame(pdf)

In [40]:
sdf.printSchema()

root
 |-- A: double (nullable = true)
 |-- B: double (nullable = true)
 |-- C: double (nullable = true)
 |-- D: double (nullable = true)



In [43]:
sdf.show()

+-------------------+-------------------+-------------------+-------------------+
|                  A|                  B|                  C|                  D|
+-------------------+-------------------+-------------------+-------------------+
| -0.380844738049631| 0.5862587508679727| 0.2452174572697309| -1.132810963769085|
|-0.7982651875685051|-0.6195759756308072| 1.0307245481672267|0.25600517204991025|
|-0.7905088654229078| 0.8254865629394254| 1.3919845871936645| 1.1864927392909963|
|0.03418307037236635|-1.7737569692393267| 0.5128379070056259|-0.9716887858826811|
|-0.7239818383074599|0.36987053223352295|-0.7183530746510405|  0.713085546436071|
|-0.8011168860350754|-1.5517577502366233|0.33607040508253505| 0.8002484264399615|
+-------------------+-------------------+-------------------+-------------------+



In [41]:
psdf1 = sdf.pandas_api()

In [42]:
psdf1.head()

Unnamed: 0,A,B,C,D
0,-0.380845,0.586259,0.245217,-1.132811
1,-0.798265,-0.619576,1.030725,0.256005
2,-0.790509,0.825487,1.391985,1.186493
3,0.034183,-1.773757,0.512838,-0.971689
4,-0.723982,0.369871,-0.718353,0.713086


In [45]:
psdf1.dtypes

A    float64
B    float64
C    float64
D    float64
dtype: object