In [None]:
!pip install pyspark

In [1]:
!pip show pyspark

Name: pyspark
Version: 3.2.1
Summary: Apache Spark Python API
Home-page: https://github.com/apache/spark/tree/master/python
Author: Spark Developers
Author-email: dev@spark.apache.org
License: http://www.apache.org/licenses/LICENSE-2.0
Location: /Users/avkash/anaconda3/envs/python38/lib/python3.8/site-packages
Requires: py4j
Required-by: 


In [2]:
import pandas as pd
import great_expectations as ge

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

22/03/08 15:58:03 WARN Utils: Your hostname, MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.206 instead (on interface en0)
22/03/08 15:58:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/08 15:58:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
data = [{"Category": 'A', "ID": 101, "Price": 55.00, "Status": True},
        {"Category": 'B', "ID": 102, "Price": 200.50, "Status": False},
        {"Category": 'C', "ID": 103, "Price": 9.99, "Status": None},
        {"Category": 'E', "ID": 104, "Price": 18.35, "Status": True}
        ]

In [5]:
#Create a pandas dataframe
pd_df = pd.DataFrame.from_dict(data)

In [6]:
pd_df

Unnamed: 0,Category,ID,Price,Status
0,A,101,55.0,True
1,B,102,200.5,False
2,C,103,9.99,
3,E,104,18.35,True


In [7]:
type(pd_df)

pandas.core.frame.DataFrame

In [8]:
#Use pandas to avoid needing to define schema
sdf = spark.createDataFrame(
  pd_df
)

In [9]:
sdf

DataFrame[Category: string, ID: bigint, Price: double, Status: boolean]

In [10]:
sdf.show()

                                                                                

+--------+---+-----+------+
|Category| ID|Price|Status|
+--------+---+-----+------+
|       A|101| 55.0|  true|
|       B|102|200.5| false|
|       C|103| 9.99|  null|
|       E|104|18.35|  true|
+--------+---+-----+------+



In [11]:
type(sdf)

pyspark.sql.dataframe.DataFrame

#  great-expectations objects


In [12]:
# with pandas we create a great expectations object like this
pd_df_ge = ge.from_pandas(pd_df)

In [13]:
pd_df_ge

Unnamed: 0,Category,ID,Price,Status
0,A,101,55.0,True
1,B,102,200.5,False
2,C,103,9.99,
3,E,104,18.35,True


## Great expectation object from spark dataframe

In [14]:
# With yspark we can do it like this
sdf_ge = ge.dataset.SparkDFDataset(sdf)

In [15]:
sdf_ge

<great_expectations.dataset.sparkdf_dataset.SparkDFDataset at 0x7fa009997220>

In [None]:
#sdf_ge.expect

In [16]:
sdf_ge.expect_column_values_to_not_be_null('Status')

{
  "result": {
    "element_count": 4,
    "unexpected_count": 1,
    "unexpected_percent": 25.0,
    "unexpected_percent_total": 25.0,
    "partial_unexpected_list": [
      null
    ]
  },
  "success": false,
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [19]:
sdf_ge.expect_column_values_to_be_between('Price', 1, 100)

{
  "result": {
    "element_count": 4,
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_count": 1,
    "unexpected_percent": 25.0,
    "unexpected_percent_total": 25.0,
    "unexpected_percent_nonmissing": 25.0,
    "partial_unexpected_list": [
      200.5
    ]
  },
  "success": false,
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [20]:
sdf_ge.expect_column_to_exist(['Price'])

{
  "result": {},
  "success": false,
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [21]:
sdf_ge.expect_column_to_exist('Price')

{
  "result": {},
  "success": true,
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

# Managing GE Spark DataFrame and SparkDataFrame

In [22]:
sdf.count()

4

In [23]:
sdf_ge.count()

AttributeError: 'SparkDFDataset' object has no attribute 'count'

In [24]:
sdf_ge.spark_df.count()

4