In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [0]:
from datetime import datetime, date
import pandas as pd
from pyspark.sql import Row

df = spark.createDataFrame([
    Row( a = 1, b = 2., c = 'string1', d = date(2000, 1, 1), e = datetime(2000, 1, 1, 12, 0)),
    Row( a = 2, b = 3., c = 'string2', d = date(2000, 2, 1), e = datetime(2000, 2, 1, 12, 0)),
    Row( a = 3, b = 4., c = 'string3', d = date(2000, 3, 1), e = datetime(2000, 3, 2, 12, 0))
])

df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [0]:

# Pyspark DF with explicit schema

df = spark.createDataFrame([
    (1, 2.,'string1',date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3.,'string2',date(2000,2,1), datetime(2000,1,2,12,0)),
    (3,4.,'string3',date(2000,3,1),datetime(2000,1,3,12,0))
], schema = 'a long, b double, c string, d date, e timestamp')

df 

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [0]:
# Pyspark df from pandas df

pandas_df = pd.DataFrame({
    'a':[1, 2, 3],
    'b':[2.,3.,4.],
    'c':['string1','string2','string3'],
    'd':[date(2000,1,1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000,1,1,12,0),
          datetime(2000,1,2,12,0),
          datetime(2000,1,3,12,0)]
})

df = spark.createDataFrame(pandas_df)

df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [0]:
df.show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [0]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)




# Viewing Data



In [0]:
df.show(1)

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+
only showing top 1 row



In [0]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)

In [0]:
df

a,b,c,d,e
1,2.0,string1,2000-01-01,2000-01-01 12:00:00
2,3.0,string2,2000-02-01,2000-01-02 12:00:00
3,4.0,string3,2000-03-01,2000-01-03 12:00:00


In [0]:
df.show(1, vertical=True)

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
only showing top 1 row



In [0]:
df.columns

['a', 'b', 'c', 'd', 'e']

In [0]:
df.printSchema()

root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)




Show summary of dataframe

In [0]:
df.select("a","b","c").describe().show()

+-------+---+---+-------+
|summary|  a|  b|      c|
+-------+---+---+-------+
|  count|  3|  3|      3|
|   mean|2.0|3.0|   NULL|
| stddev|1.0|1.0|   NULL|
|    min|  1|2.0|string1|
|    max|  3|4.0|string3|
+-------+---+---+-------+



In [0]:
df.collect()

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0)),
 Row(a=3, b=4.0, c='string3', d=datetime.date(2000, 3, 1), e=datetime.datetime(2000, 1, 3, 12, 0))]

In [0]:
df.take(1)

[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0))]

In [0]:
df.toPandas()

Unnamed: 0,a,b,c,d,e
0,1,2.0,string1,2000-01-01,2000-01-01 12:00:00
1,2,3.0,string2,2000-02-01,2000-01-02 12:00:00
2,3,4.0,string3,2000-03-01,2000-01-03 12:00:00



# Selecting and Accessing data

In [0]:
df.a

Column<'a'>

In [0]:
from pyspark.sql import Column
from pyspark.sql.functions import upper 

type(df.c) == type(upper(df.c)) == type(df.c.isNull())

True

In [0]:
df.select(df.c).show()

+-------+
|      c|
+-------+
|string1|
|string2|
|string3|
+-------+



In [0]:
# Assign new column instance 

df.withColumn('upper_c', upper(df.c)).show()

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|STRING2|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|STRING3|
+---+---+-------+----------+-------------------+-------+



In [0]:
# Select rows subset 

df.filter(df.a == 1).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+



% md 

# Applying a function 

Pandas UDF's and Pandas Function API's


In [0]:
from pyspark.sql.functions import pandas_udf 

@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    return series + 1 

df.select(pandas_plus_one(df.a)).show()


+------------------+
|pandas_plus_one(a)|
+------------------+
|                 2|
|                 3|
|                 4|
+------------------+



In [0]:

def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.a == 1]

df.mapInPandas(pandas_filter_func, schema = df.schema).show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
+---+---+-------+----------+-------------------+




# Grouping Data


In [0]:
df = spark.createDataFrame([
    ['red','banana', 1, 10],
    ['black','carrot',3,30],
    ['red','apple',5, 40],
    ['purple','grapes',2,46],
    ['Green','apples',4,36],
    ['Green','grapes',6,70],
    ['red','cherrys',7,25],
    ['purple','plums',8,68],
    ['black','grapes',9,60],
    ['red','watermelons',10,45]
], schema = ['color','fruit','v1','v2'])

df.show()

+------+-----------+---+---+
| color|      fruit| v1| v2|
+------+-----------+---+---+
|   red|     banana|  1| 10|
| black|     carrot|  3| 30|
|   red|      apple|  5| 40|
|purple|     grapes|  2| 46|
| Green|     apples|  4| 36|
| Green|     grapes|  6| 70|
|   red|    cherrys|  7| 25|
|purple|      plums|  8| 68|
| black|     grapes|  9| 60|
|   red|watermelons| 10| 45|
+------+-----------+---+---+



In [0]:
df.groupBy('color').avg().show()

+------+-------+-------+
| color|avg(v1)|avg(v2)|
+------+-------+-------+
|   red|   5.75|   30.0|
| black|    6.0|   45.0|
|purple|    5.0|   57.0|
| Green|    5.0|   53.0|
+------+-------+-------+



In [0]:
# Applying python function against each group by using pandas api 

def plus_mean(pandas_df):
    return pandas_df.assign(v1 = pandas_df.v1 - pandas_df.v1.mean())

df.groupBy('color').applyInPandas(plus_mean, schema = df.schema).show()

+------+-----------+---+---+
| color|      fruit| v1| v2|
+------+-----------+---+---+
| Green|     apples| -1| 36|
| Green|     grapes|  1| 70|
| black|     carrot| -3| 30|
| black|     grapes|  3| 60|
|purple|     grapes| -3| 46|
|purple|      plums|  3| 68|
|   red|     banana| -4| 10|
|   red|      apple|  0| 40|
|   red|    cherrys|  1| 25|
|   red|watermelons|  4| 45|
+------+-----------+---+---+




Co-grouping and applying a function


In [0]:
df1 = spark.createDataFrame(
    [(2000, 1, 1.0),
     (3500, 4, 2.0),
     (7800, 3, 4.5)],
    ('time','id','v1')
)

df2 = spark.createDataFrame(
    [(2000, 1, 'x'),
     (3500, 4, 'y'),
     (7800, 3, 'z')],
    ('time','id','v2')
)

def merge_ordered(l,r):
    return pd.merge_ordered(l,r)

df1.groupby('id').cogroup(df2.groupBy('id')).applyInPandas(merge_ordered,
                                                           schema = 'time int, id int, v1 double, v2 string').show()

+----+---+---+---+
|time| id| v1| v2|
+----+---+---+---+
|2000|  1|1.0|  x|
|7800|  3|4.5|  z|
|3500|  4|2.0|  y|
+----+---+---+---+




# Getting data In/Out

In [0]:
# df.write.csv('/tmp/resources/foo.csv',header=True)

# spark.read.csv('/tmp/resources/foo.csv',header=True).show()

In [0]:
# parquet

# df.write.parquet('/tmp/resources/bar.parquet')
# spark.read.parquet('/tmp/resources/bar.parquet').show()

In [0]:
# ORC

# df.write.orc('/tmp/resources/zoo.orc')

# spark.read.orc('/tmp/resources/zoo.orc').show()


# Working with SQL 


In [0]:
df.createOrReplaceTempView("tableA")

spark.sql("select count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|      10|
+--------+



In [0]:
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series :
    return s + 1

spark.udf.register("add_one",add_one)
spark.sql("select add_one(v1) from tableA").show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          4|
|          6|
|          3|
|          5|
|          7|
|          8|
|          9|
|         10|
|         11|
+-----------+



In [0]:
from pyspark.sql.functions import expr 

df.selectExpr('add_one(v1)').show()
df.select(expr('count(*)') > 0).show()

+-----------+
|add_one(v1)|
+-----------+
|          2|
|          4|
|          6|
|          3|
|          5|
|          7|
|          8|
|          9|
|         10|
|         11|
+-----------+

+--------------+
|(count(1) > 0)|
+--------------+
|          true|
+--------------+




# Object creation

In [0]:
import pyspark.pandas as ps 
from pyspark.sql import SparkSession
import numpy as np 

In [0]:
s = ps.Series([1, 3, 5, np.nan, 6, 8])
s

0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64

In [0]:
psdf = ps.DataFrame({
    'a': [1,2,4, 3, 5,6],
    'b':[100,200,300,450,600,500],
    'c':['one',"two","three","four","five","six"]}, index = [10, 20, 30, 40, 50, 60])
psdf

Unnamed: 0,a,b,c
10,1,100,one
20,2,200,two
30,4,300,three
40,3,450,four
50,5,600,five
60,6,500,six



## Working with pandas Datetime

In [0]:
dates = pd.date_range('20130101',periods = 6)

In [0]:
dates

DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04',
               '2013-01-05', '2013-01-06'],
              dtype='datetime64[ns]', freq='D')

In [0]:
pdf = pd.DataFrame(np.random.randn(6,4), index = dates, columns = list('ABCD'))


pdf 

Unnamed: 0,A,B,C,D
2013-01-01,0.59382,0.174552,1.032826,-0.80572
2013-01-02,-0.709554,-1.873294,0.463508,-1.417828
2013-01-03,-0.475069,-1.294479,0.550302,2.144888
2013-01-04,-3.016154,0.621143,-0.524145,-0.354935
2013-01-05,0.259988,-0.303898,-1.150781,-0.049736
2013-01-06,-1.653114,0.058861,-0.266348,1.015618


In [0]:
psdf = ps.from_pandas(pdf)

psdf

Unnamed: 0,A,B,C,D
2013-01-01,0.59382,0.174552,1.032826,-0.80572
2013-01-02,-0.709554,-1.873294,0.463508,-1.417828
2013-01-03,-0.475069,-1.294479,0.550302,2.144888
2013-01-04,-3.016154,0.621143,-0.524145,-0.354935
2013-01-05,0.259988,-0.303898,-1.150781,-0.049736
2013-01-06,-1.653114,0.058861,-0.266348,1.015618


In [0]:
type(psdf)

pyspark.pandas.frame.DataFrame

In [0]:
psdf

Unnamed: 0,A,B,C,D
2013-01-01,0.59382,0.174552,1.032826,-0.80572
2013-01-02,-0.709554,-1.873294,0.463508,-1.417828
2013-01-03,-0.475069,-1.294479,0.550302,2.144888
2013-01-04,-3.016154,0.621143,-0.524145,-0.354935
2013-01-05,0.259988,-0.303898,-1.150781,-0.049736
2013-01-06,-1.653114,0.058861,-0.266348,1.015618


In [0]:
# creating spark df from pandas df

spark = SparkSession.builder.getOrCreate()

sdf = spark.createDataFrame(pdf)

sdf.show()


+-------------------+-------------------+-------------------+--------------------+
|                  A|                  B|                  C|                   D|
+-------------------+-------------------+-------------------+--------------------+
| 0.5938198350794955|0.17455195894761674| 1.0328258346081232| -0.8057201650964463|
|-0.7095537155916017|-1.8732944321413219|  0.463508136933873| -1.4178280023912393|
|-0.4750693376771988| -1.294479119778514| 0.5503019845725223|  2.1448882780414804|
|-3.0161536039774153| 0.6211431549237827|-0.5241448822092518|-0.35493524961420875|
| 0.2599882477140967| -0.303898317250204|-1.1507805779066573|-0.04973631285505286|
|-1.6531141932408224|0.05886131663879939|-0.2663475746711511|  1.0156182847992248|
+-------------------+-------------------+-------------------+--------------------+



In [0]:
# pandas on spark df

psdf = sdf.pandas_api()

psdf 

Unnamed: 0,A,B,C,D
0,0.59382,0.174552,1.032826,-0.80572
1,-0.709554,-1.873294,0.463508,-1.417828
2,-0.475069,-1.294479,0.550302,2.144888
3,-3.016154,0.621143,-0.524145,-0.354935
4,0.259988,-0.303898,-1.150781,-0.049736
5,-1.653114,0.058861,-0.266348,1.015618


In [0]:
psdf.dtypes 

A    float64
B    float64
C    float64
D    float64
dtype: object

In [0]:
psdf.index

Int64Index([0, 1, 2, 3, 4, 5], dtype='int64')

In [0]:
psdf.head()

Unnamed: 0,A,B,C,D
0,0.59382,0.174552,1.032826,-0.80572
1,-0.709554,-1.873294,0.463508,-1.417828
2,-0.475069,-1.294479,0.550302,2.144888
3,-3.016154,0.621143,-0.524145,-0.354935
4,0.259988,-0.303898,-1.150781,-0.049736


In [0]:
psdf.columns

Index(['A', 'B', 'C', 'D'], dtype='object')

In [0]:
psdf.to_numpy()

array([[ 0.59381984,  0.17455196,  1.03282583, -0.80572017],
       [-0.70955372, -1.87329443,  0.46350814, -1.417828  ],
       [-0.47506934, -1.29447912,  0.55030198,  2.14488828],
       [-3.0161536 ,  0.62114315, -0.52414488, -0.35493525],
       [ 0.25998825, -0.30389832, -1.15078058, -0.04973631],
       [-1.65311419,  0.05886132, -0.26634757,  1.01561828]])

In [0]:
psdf.describe()

Unnamed: 0,A,B,C,D
count,6.0,6.0,6.0,6.0
mean,-0.833347,-0.436186,0.01756,0.088714
std,1.327595,0.954512,0.806538,1.294039
min,-3.016154,-1.873294,-1.150781,-1.417828
25%,-1.653114,-1.294479,-0.524145,-0.80572
50%,-0.709554,-0.303898,-0.266348,-0.354935
75%,0.259988,0.174552,0.550302,1.015618
max,0.59382,0.621143,1.032826,2.144888


In [0]:
psdf.T 

Unnamed: 0,0,1,2,3,4,5
A,0.59382,-0.709554,-0.475069,-3.016154,0.259988,-1.653114
B,0.174552,-1.873294,-1.294479,0.621143,-0.303898,0.058861
C,1.032826,0.463508,0.550302,-0.524145,-1.150781,-0.266348
D,-0.80572,-1.417828,2.144888,-0.354935,-0.049736,1.015618


In [0]:
psdf.sort_index(ascending = False)

Unnamed: 0,A,B,C,D
5,-1.653114,0.058861,-0.266348,1.015618
4,0.259988,-0.303898,-1.150781,-0.049736
3,-3.016154,0.621143,-0.524145,-0.354935
2,-0.475069,-1.294479,0.550302,2.144888
1,-0.709554,-1.873294,0.463508,-1.417828
0,0.59382,0.174552,1.032826,-0.80572


In [0]:
psdf.sort_values(by='B')

Unnamed: 0,A,B,C,D
1,-0.709554,-1.873294,0.463508,-1.417828
2,-0.475069,-1.294479,0.550302,2.144888
4,0.259988,-0.303898,-1.150781,-0.049736
5,-1.653114,0.058861,-0.266348,1.015618
0,0.59382,0.174552,1.032826,-0.80572
3,-3.016154,0.621143,-0.524145,-0.354935



# Missing data

In [0]:
pdf1 = pdf.reindex(index = dates[0:4],
                   columns=list(pdf.columns) + ['E'])

pdf1.loc[dates[0]:dates[1],'E'] = 1

In [0]:
pdf1 

Unnamed: 0,A,B,C,D,E
2013-01-01,0.59382,0.174552,1.032826,-0.80572,1.0
2013-01-02,-0.709554,-1.873294,0.463508,-1.417828,1.0
2013-01-03,-0.475069,-1.294479,0.550302,2.144888,
2013-01-04,-3.016154,0.621143,-0.524145,-0.354935,


In [0]:
psdf1 = ps.from_pandas(pdf1)

psdf1

Unnamed: 0,A,B,C,D,E
2013-01-01,0.59382,0.174552,1.032826,-0.80572,1.0
2013-01-02,-0.709554,-1.873294,0.463508,-1.417828,1.0
2013-01-03,-0.475069,-1.294479,0.550302,2.144888,
2013-01-04,-3.016154,0.621143,-0.524145,-0.354935,


In [0]:
# drop missing values

psdf1.dropna(how='any')

Unnamed: 0,A,B,C,D,E
2013-01-01,0.59382,0.174552,1.032826,-0.80572,1.0
2013-01-02,-0.709554,-1.873294,0.463508,-1.417828,1.0


In [0]:
psdf1.fillna(value=5)

Unnamed: 0,A,B,C,D,E
2013-01-01,0.59382,0.174552,1.032826,-0.80572,1.0
2013-01-02,-0.709554,-1.873294,0.463508,-1.417828,1.0
2013-01-03,-0.475069,-1.294479,0.550302,2.144888,5.0
2013-01-04,-3.016154,0.621143,-0.524145,-0.354935,5.0



# Operations



In [0]:
psdf.mean()

A   -0.833347
B   -0.436186
C    0.017560
D    0.088714
dtype: float64


# Grouping

In [0]:
psdf = ps.DataFrame(
    {
        'A' : ['foo','bar','foo','bar','foo','bar','foo','bar','foo','foo'],
        'B' : ['one','one','two','three','one','two','one','one','one','two'],
        'C': np.random.randn(10),
        'D': np.random.randn(10)
    }
)

In [0]:
psdf

Unnamed: 0,A,B,C,D
0,foo,one,-0.970349,0.775853
1,bar,one,-1.763137,0.551602
2,foo,two,0.233802,-0.361768
3,bar,three,0.926821,-0.21668
4,foo,one,1.932384,-1.10278
5,bar,two,-0.850934,0.374522
6,foo,one,0.146959,0.543828
7,bar,one,-0.225823,0.473678
8,foo,one,-2.421085,0.557798
9,foo,two,-0.324325,-2.247356


In [0]:
psdf.groupby('A').sum()

Unnamed: 0_level_0,C,D
A,Unnamed: 1_level_1,Unnamed: 2_level_1
bar,-1.913073,1.183122
foo,-1.402613,-1.834426


In [0]:
psdf.groupby(['A','B']).sum()

Unnamed: 0_level_0,Unnamed: 1_level_0,C,D
A,B,Unnamed: 2_level_1,Unnamed: 3_level_1
foo,one,-1.312091,0.774699
bar,one,-1.98896,1.02528
foo,two,-0.090522,-2.609124
bar,three,0.926821,-0.21668
bar,two,-0.850934,0.374522



# Transform & apply function 



In [0]:
psdf = ps.DataFrame({'a':[1,2,3],'b': [4,5,6]})

def pandas_plus(pser):
    return pser + 1 

psdf.transform(pandas_plus)

Unnamed: 0,a,b
0,2,5
1,3,6
2,4,7


In [0]:
psdf = ps.DataFrame({'a':[1,2,3],'b': [4,5,6]})

def pandas_plus(pser):
    return pser[pser % 2 == 1]

psdf.apply(pandas_plus)

Unnamed: 0,a,b
0,1.0,
1,,5.0
2,3.0,


In [0]:
psdf = ps.DataFrame({'a':[1,2,3], 'b':[4,5,6]})
def pandas_plus(pser):
    return sum(pser)

psdf.apply(pandas_plus,axis='columns')

0    5
1    7
2    9
dtype: int64

In [0]:
psdf = ps.DataFrame({'a':[1,2,3],'b':[4,5,6]})

def pandas_plus(pdf):
    return pdf + 1

psdf.pandas_on_spark.transform_batch(pandas_plus)


Unnamed: 0,a,b
0,2,5
1,3,6
2,4,7


In [0]:
psdf = ps.DataFrame({'a':[1,2,3],'b':[4,5,6]})

def pandas_plus(pdf):
    return pdf[pdf.a > 1]

psdf.pandas_on_spark.apply_batch(pandas_plus)

Unnamed: 0,a,b
1,2,5
2,3,6


In [0]:
psdf = ps.DataFrame({'a':[1,2,3],'b':[4,5,6]})

def pandas_plus(pser):
    return pser + 1

psdf.a.pandas_on_spark.transform_batch(pandas_plus)

0    2
1    3
2    4
Name: a, dtype: int64


# Type casting 

In [0]:
# spark df

sdf = spark.createDataFrame([
    (1,(1.3), 1.2, 1.3, 1, 1, 1, datetime(2020,10,27), "1", True, datetime(2023,12, 4))],
                            'tinyint tinyint,decimal decimal,float float,double double, integer integer,long long,short short,timestamp timestamp,string string,boolean boolean, date: date')

[0;31m---------------------------------------------------------------------------[0m
[0;31mPySparkTypeError[0m                          Traceback (most recent call last)
File [0;32m<command-1329073826761124>, line 3[0m
[1;32m      1[0m [38;5;66;03m# spark df[39;00m
[0;32m----> 3[0m sdf [38;5;241m=[39m [43mspark[49m[38;5;241;43m.[39;49m[43mcreateDataFrame[49m[43m([49m[43m[[49m
[1;32m      4[0m [43m    [49m[43m([49m[38;5;241;43m1[39;49m[43m,[49m[43m([49m[38;5;241;43m1.3[39;49m[43m)[49m[43m,[49m[43m [49m[38;5;241;43m1.2[39;49m[43m,[49m[43m [49m[38;5;241;43m1.3[39;49m[43m,[49m[43m [49m[38;5;241;43m1[39;49m[43m,[49m[43m [49m[38;5;241;43m1[39;49m[43m,[49m[43m [49m[38;5;241;43m1[39;49m[43m,[49m[43m [49m[43mdatetime[49m[43m([49m[38;5;241;43m2020[39;49m[43m,[49m[38;5;241;43m10[39;49m[43m,[49m[38;5;241;43m27[39;49m[43m)[49m[43m,[49m[43m [49m[38;5;124;43m"[39;49m[38;5;124;43m1[39;49m[38;5;124;43m"[

In [0]:
sdf

int8,bool,float32,float64,int32,int64,int16,datetime,object_string,object_decimal,object_date
1,True,1.0,1.0,1,1,1,2020-10-27 00:00:00,1,1.1,2020-10-27


In [0]:
sdf.dtypes

[('int8', 'tinyint'),
 ('bool', 'boolean'),
 ('float32', 'float'),
 ('float64', 'double'),
 ('int32', 'int'),
 ('int64', 'bigint'),
 ('int16', 'smallint'),
 ('datetime', 'timestamp'),
 ('object_string', 'string'),
 ('object_decimal', 'string'),
 ('object_date', 'date')]

In [0]:
# pandas-on-spark data types

psdf = sdf.pandas_api()

psdf.dtypes 

int8                        int8
bool                        bool
float32                  float32
float64                  float64
int32                      int32
int64                      int64
int16                      int16
datetime          datetime64[ns]
object_string             object
object_decimal            object
object_date               object
dtype: object

In [0]:
psdf = ps.DataFrame({
    "int8" :[1],
    "bool" : [True],
    "float32" : [1.0],
    "float64" : [1.0],
    "int32" : [1],
    "int64" :[1] , 
    "int16" : [1],
    "datetime" : [datetime(2020,10,27)],
    "object_string" : ["1"],
    "object_decimal" : ["1.1"],
    "object_date" : [date(2020,10,27)]
})

psdf

Unnamed: 0,int8,bool,float32,float64,int32,int64,int16,datetime,object_string,object_decimal,object_date
0,1,True,1.0,1.0,1,1,1,2020-10-27,1,1.1,2020-10-27


In [0]:
# type casting pandas-on spark df

psdf['int8'] = psdf['int8'].astype('int8')

psdf['int16'] = psdf['int16'].astype('int16')

psdf['int32'] = psdf['int32'].astype('int32')

psdf['float32'] = psdf['float32'].astype('float32')

psdf.dtypes 

int8                        int8
bool                        bool
float32                  float32
float64                  float64
int32                      int32
int64                      int64
int16                      int16
datetime          datetime64[ns]
object_string             object
object_decimal            object
object_date               object
dtype: object

In [0]:
# converting pandas on spark df to pyspark df 

sdf = psdf.to_spark()

In [0]:
sdf 

int8,bool,float32,float64,int32,int64,int16,datetime,object_string,object_decimal,object_date
1,True,1.0,1.0,1,1,1,2020-10-27 00:00:00,1,1.1,2020-10-27


In [0]:
sdf.dtypes

[('int8', 'tinyint'),
 ('bool', 'boolean'),
 ('float32', 'float'),
 ('float64', 'double'),
 ('int32', 'int'),
 ('int64', 'bigint'),
 ('int16', 'smallint'),
 ('datetime', 'timestamp'),
 ('object_string', 'string'),
 ('object_decimal', 'string'),
 ('object_date', 'date')]

In [0]:
# convert pandas=on-spark df to pandas df

pdf = psdf.to_pandas()

pdf.dtypes

int8                        int8
bool                        bool
float32                  float32
float64                  float64
int32                      int32
int64                      int64
int16                      int16
datetime          datetime64[ns]
object_string             object
object_decimal            object
object_date               object
dtype: object

In [0]:

# categorical data casting is not supported in pyspark 

# ps.Series([pd.Categorical(1,2,3)])

In [0]:
# pandas api on spark

from pyspark.pandas.typedef import as_spark_type 

as_spark_type(int)

LongType()

In [0]:
as_spark_type(np.int32)

IntegerType()

In [0]:
import typing 

as_spark_type(typing.List[float])

ArrayType(DoubleType(), True)

In [0]:
ps.Series([0.3,0.1,0.8]).spark.data_type 

DoubleType()

In [0]:
ps.Series(["welcome","to","pandas-on-spark"]).spark.data_type

StringType()

In [0]:
ps.DataFrame({"d" : [0.3,0.24,0.8],
              "e" : ["welcome","to","pandas-on-spark"],
              "f" : [False,True,False] }).spark.print_schema()

root
 |-- d: double (nullable = false)
 |-- e: string (nullable = false)
 |-- f: boolean (nullable = false)



In [0]:
# Multiple datatypes are not supported by pandas api on spark
# ps.Series([1,"A"])


# Plotting 

In [0]:
pser = pd.Series(np.random.randn(1000),
                 index = pd.date_range('1/1/2000',periods=1000))

psser = ps.Series(pser)

psser = psser.cummax()

psser.plot()

In [0]:
pdf = pd.DataFrame(np.random.randn(1000, 4),index = pser.index, 
                   columns= ['A','B','C','D'])

psdf = ps.from_pandas(pdf)

psdf = psdf.cummax()

psdf.plot()
