In [4]:
import pandas as pd
import numpy as np
import pyspark.pandas as ps
from pyspark.sql import SparkSession
import os

# Object Creation

Creating a pandas-on-Spark Series by passin a list of values, letting pandas API on Spark create a default integer index:

In [5]:
import numpy as np
s = ps.Series([1, 3, 5, np.nan, 6, 8])

23/09/17 17:21:22 WARN Utils: Your hostname, juan-MS-7A57 resolves to a loopback address: 127.0.1.1; using 192.168.20.22 instead (on interface enp6s0)
23/09/17 17:21:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/17 17:21:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/09/17 17:21:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/09/17 17:21:23 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [6]:
s

                                                                                

0    1.0
1    3.0
2    5.0
3    NaN
4    6.0
5    8.0
dtype: float64

In [7]:
psdf = ps.DataFrame(
    {'a': [1, 2, 3, 4, 5, 6],
     'b': [100, 200, 300, 400, 500, 600],
     'c': ["one", "two", "three", "four", "five", "six"]},
    index=[10, 20, 30, 40, 50, 60])

In [8]:
psdf

Unnamed: 0,a,b,c
10,1,100,one
20,2,200,two
30,3,300,three
40,4,400,four
50,5,500,five
60,6,600,six


Creating a pandas DataFrame by passing a numpy array, with a datetime index and labeled columns:

In [9]:
dates = pd.date_range('20130101', periods=6)

In [10]:
dates

DatetimeIndex(['2013-01-01', '2013-01-02', '2013-01-03', '2013-01-04',
               '2013-01-05', '2013-01-06'],
              dtype='datetime64[ns]', freq='D')

In [12]:
pdf = pd.DataFrame(np.random.randn(10, 4), columns=list('ABCD'))

In [13]:
pdf

Unnamed: 0,A,B,C,D
0,1.177366,0.42103,0.340005,-0.724065
1,0.624374,-0.16056,1.626612,-0.173201
2,-1.10619,-2.706917,0.950834,1.299101
3,-0.022283,-0.389551,0.826336,-0.103164
4,0.098626,-0.808993,-2.849881,-0.877243
5,-1.246611,-0.153109,-0.764095,-0.647557
6,0.057026,-0.701886,-1.653814,0.819349
7,0.694305,-0.033926,0.571393,0.059759
8,-1.203787,-0.92266,-0.653881,-0.989071
9,-0.65075,0.999133,0.532384,-1.075776


Now, this pandas DataFrame can be converted to a pandas-on-Spark DataFrame

In [14]:
psdf = ps.from_pandas(pdf)
psdf

Unnamed: 0,A,B,C,D
0,1.177366,0.42103,0.340005,-0.724065
1,0.624374,-0.16056,1.626612,-0.173201
2,-1.10619,-2.706917,0.950834,1.299101
3,-0.022283,-0.389551,0.826336,-0.103164
4,0.098626,-0.808993,-2.849881,-0.877243
5,-1.246611,-0.153109,-0.764095,-0.647557
6,0.057026,-0.701886,-1.653814,0.819349
7,0.694305,-0.033926,0.571393,0.059759
8,-1.203787,-0.92266,-0.653881,-0.989071
9,-0.65075,0.999133,0.532384,-1.075776


In [15]:
spark = SparkSession.builder.getOrCreate()

In [16]:
sdf = spark.createDataFrame(pdf)
sdf.show()

  if should_localize and is_datetime64tz_dtype(s.dtype) and s.dt.tz is not None:


+--------------------+--------------------+-------------------+--------------------+
|                   A|                   B|                  C|                   D|
+--------------------+--------------------+-------------------+--------------------+
|   1.177365713824022| 0.42102993057929955| 0.3400047044385909| -0.7240652077025155|
|  0.6243744015938828| -0.1605597326426859|  1.626611979531416|-0.17320087516630064|
|  -1.106190012869022|   -2.70691745594171| 0.9508342870422485|   1.299100550451211|
|-0.02228325972603...| -0.3895512456992725|  0.826336025735948|-0.10316415532826517|
|  0.0986258469397019| -0.8089931037133232| -2.849881061304416| -0.8772430149888827|
| -1.2466113640723373|-0.15310918377244487|-0.7640952395505964| -0.6475567036906257|
| 0.05702601534002215|  -0.701885758057203|-1.6538144774819512|   0.819349296548811|
|  0.6943047434055223|-0.03392561437935...| 0.5713933558655782|0.059758979286008665|
| -1.2037865378936174| -0.9226604233404867|-0.6538812320446808| -

In [18]:
psdf = sdf.pandas_api()
psdf.head()

Unnamed: 0,A,B,C,D
0,1.177366,0.42103,0.340005,-0.724065
1,0.624374,-0.16056,1.626612,-0.173201
2,-1.10619,-2.706917,0.950834,1.299101
3,-0.022283,-0.389551,0.826336,-0.103164
4,0.098626,-0.808993,-2.849881,-0.877243


In [19]:
type(psdf)

pyspark.pandas.frame.DataFrame

# Missing Data

Pandas API on Spark primarily uses the `np.nan` to represent missing data. It is by default not included in the computations.

In [20]:
pdf1 = pdf.reindex(
    index=dates[:4], 
    columns=list(pdf.columns) + ['E']
)

In [21]:
pdf1.loc[dates[0]:dates[1], 'E'] = 1

In [22]:
psdf1 = ps.from_pandas(pdf1)

  if is_datetime64tz_dtype(s.dtype):


In [None]:
psdf1

In [24]:
prev = spark.conf.get("spark.sql.execution.arrow.pyspark.enabled")  # Keep its default value.
ps.set_option("compute.default_index_type", "distributed")  # Use default index prevent overhead.
import warnings
warnings.filterwarnings("ignore")  #

In [25]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", True)
%timeit ps.range(300000).to_pandas()

117 ms ± 16.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [26]:
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", False)
%timeit ps.range(300000).to_pandas()

664 ms ± 50.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [27]:
ps.reset_option("compute.default_index_type")
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", prev)  # Set its default value back.

# Grouping

In [28]:
psdf = ps.DataFrame({'A': ['foo', 'bar', 'foo', 'bar',
                          'foo', 'bar', 'foo', 'foo'],
                    'B': ['one', 'one', 'two', 'three',
                          'two', 'two', 'one', 'three'],
                    'C': np.random.randn(8),
                    'D': np.random.randn(8)})
                    
psdf

Unnamed: 0,A,B,C,D
0,foo,one,-0.604324,1.548133
1,bar,one,-0.610522,0.578099
2,foo,two,-2.176163,-0.462768
3,bar,three,-0.289412,0.607628
4,foo,two,0.575108,-1.204249
5,bar,two,-0.867408,2.14946
6,foo,one,-0.366477,0.19903
7,foo,three,0.600554,0.248624


In [29]:
psdf.groupby('A').sum()

Unnamed: 0_level_0,C,D
A,Unnamed: 1_level_1,Unnamed: 2_level_1
foo,-1.971301,0.328771
bar,-1.767342,3.335186


# Plotting

In [30]:
pser = pd.Series(np.random.randn(1000),
                 index=pd.date_range('1/1/2000', periods=1000))
psser = ps.Series(pser)
psser = psser.cummax()
# Pandas default backend to matplotlib
psser