In [1]:
pip install -r ../requirements.txt

Note: you may need to restart the kernel to use updated packages.


In [10]:
os.environ['SPARK_VERSION'] = '3.1'

In [12]:
from pyspark.sql import SparkSession, Row

import pydeequ
from pydeequ.analyzers import *
from pydeequ.profiles import *
from pydeequ.checks import *
from pydeequ.verification import *

In [13]:
spark = (
    SparkSession.builder.config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate()
)

:: loading settings :: url = jar:file:/Users/jose/Documents/data_quality_tools/jupyter/jup_env/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/jose/.ivy2/cache
The jars for the packages stored in: /Users/jose/.ivy2/jars
com.amazon.deequ#deequ added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-3ca4d226-97a3-48fc-90ad-55ed238bb092;1.0
	confs: [default]
	found com.amazon.deequ#deequ;2.0.0-spark-3.1 in central
	found org.scalanlp#breeze_2.12;0.13.2 in central
	found org.scalanlp#breeze-macros_2.12;0.13.2 in central
	found org.scala-lang#scala-reflect;2.12.1 in central
	found com.github.fommil.netlib#core;1.1.2 in central
	found net.sf.opencsv#opencsv;2.3 in central
	found com.github.rwl#jtransforms;2.4.0 in central
	found junit#junit;4.8.2 in central
	found org.apache.commons#commons-math3;3.2 in central
	found org.spire-math#spire_2.12;0.13.0 in central
	found org.spire-math#spire-macros_2.12;0.13.0 in central
	found org.typelevel#machinist_2.12;0.6.1 in central
	found com.chuusai#shapeless_2.12;2.3.2 in central
	found org.typelevel#macro-compat_2.12;1.1.1 in 

In [86]:
from pyspark.sql.functions import col
from pyspark.sql.types import FloatType,BooleanType,DateType

df = spark.read.option("header", True).csv("../data/2004-2021.tsv",sep="	")
df = df.withColumnRenamed("PREÇO MÍNIMO REVENDA","PRECO_MINIMO_REVENDA")
df = df.withColumn("PRECO_MINIMO_REVENDA",col("PRECO_MINIMO_REVENDA").cast(FloatType()))
df = df.withColumnRenamed("PREÇO MÁXIMO REVENDA","PRECO_MAXIMO_REVENDA")
df = df.withColumn("PRECO_MAXIMO_REVENDA",col("PRECO_MAXIMO_REVENDA").cast(FloatType()))
df.printSchema()

root
 |-- DATA INICIAL: string (nullable = true)
 |-- DATA FINAL: string (nullable = true)
 |-- REGIÃO: string (nullable = true)
 |-- ESTADO: string (nullable = true)
 |-- PRODUTO: string (nullable = true)
 |-- NÚMERO DE POSTOS PESQUISADOS: string (nullable = true)
 |-- UNIDADE DE MEDIDA: string (nullable = true)
 |-- PREÇO MÉDIO REVENDA: string (nullable = true)
 |-- DESVIO PADRÃO REVENDA: string (nullable = true)
 |-- PRECO_MINIMO_REVENDA: float (nullable = true)
 |-- PRECO_MAXIMO_REVENDA: float (nullable = true)
 |-- MARGEM MÉDIA REVENDA: string (nullable = true)
 |-- COEF DE VARIAÇÃO REVENDA: string (nullable = true)
 |-- PREÇO MÉDIO DISTRIBUIÇÃO: string (nullable = true)
 |-- DESVIO PADRÃO DISTRIBUIÇÃO: string (nullable = true)
 |-- PREÇO MÍNIMO DISTRIBUIÇÃO: string (nullable = true)
 |-- PREÇO MÁXIMO DISTRIBUIÇÃO: string (nullable = true)
 |-- COEF DE VARIAÇÃO DISTRIBUIÇÃO: string (nullable = true)



In [38]:
ESTADOS = ['DISTRITO FEDERAL', 'GOIAS', 'MATO GROSSO', 'MATO GROSSO DO SUL','ALAGOAS', 'BAHIA', 'CEARA', 'MARANHAO', 'PARAIBA', 'PERNAMBUCO',
       'PIAUI', 'RIO GRANDE DO NORTE', 'SERGIPE', 'ACRE', 'AMAPA','AMAZONAS', 'PARA', 'RONDONIA', 'RORAIMA', 'TOCANTINS',
       'ESPIRITO SANTO', 'MINAS GERAIS', 'RIO DE JANEIRO', 'SAO PAULO','PARANA', 'RIO GRANDE DO SUL', 'SANTA CATARINA']

In [87]:
check = Check(spark, CheckLevel.Warning, "Gas prices in Brazil")

checkResult = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(
        check.isComplete("DATA INICIAL") \
        .isContainedIn("ESTADO", ESTADOS) \
        .hasDataType("NÚMERO DE POSTOS PESQUISADOS", ConstrainableDataTypes.Numeric) \
        .hasPattern("UNIDADE DE MEDIDA", "^R\$/") \
        .isGreaterThanOrEqualTo("PRECO_MAXIMO_REVENDA", "PRECO_MINIMO_REVENDA") \
    ).run()


                                                                                

In [88]:
print(f"Verification Run Status: {checkResult.status}")
checkResult_df = VerificationResult.checkResultsAsDataFrame(spark, checkResult)
checkResult_df.show()

Verification Run Status: Success
+--------------------+-----------+------------+--------------------+-----------------+------------------+
|               check|check_level|check_status|          constraint|constraint_status|constraint_message|
+--------------------+-----------+------------+--------------------+-----------------+------------------+
+--------------------+-----------+------------+--------------------+-----------------+------------------+

