In [1]:
import findspark
findspark.init('C:\Spark\spark-2.4.4-bin-hadoop2.7')

from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format, udf 
from pyspark.sql.types import DateType

In [2]:
spark = SparkSession \
    .builder \
    .appName("Basic JDBC pipeline") \
    .config("spark.driver.extraClassPath", "postgresql-42.1.4.jar") \
    .config("spark.executor.extraClassPath", "postgresql-42.1.4.jar") \
    .getOrCreate()

In [5]:
# ---------------------------------TAREA CON SET DE DATOS FACTURA -------------------------------
df = spark \
    .read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost/postgres") \
    .option("user", "postgres") \
    .option("password", "Sqlserver2008") \
    .option("query", "select ltrim(rtrim((articulo))) as articulos,* from factura") \
    .load()

df.show()

+----------+-----+--------------------+-------------------+--------+--------------------+--------------------+
| articulos|   id|             factura|      fecha_factura|cantidad|        precio_total|            articulo|
+----------+-----+--------------------+-------------------+--------+--------------------+--------------------+
|   MU08534| 2422|10000101000001000...|2019-05-02 00:00:00|     101|20099.00000000000...|MU08534          ...|
|     75555| 3185|10000101000001000...|2019-05-05 00:00:00|     116|92800.00000000000...|75555            ...|
|     75555| 5328|10000101000001000...|2019-05-05 00:00:00|     300|930000.0000000000...|75555            ...|
|   1712162|10785|10000101000001000...|2019-04-27 00:00:00|     121|20000.00000000000...|1712162          ...|
|   5060781|11153|10000101000001000...|2019-04-27 00:00:00|     121|20000.00000000000...|5060781          ...|
|1012451-12|11406|10000101000001000...|2019-04-18 00:00:00|     101|20482.00000000000...|1012451-12       ...|
|

In [6]:
formatted_df = df.withColumn("date_string", date_format(col("fecha_factura"), 'MM/dd/yyyy'))
formatted_df.show()

+----------+-----+--------------------+-------------------+--------+--------------------+--------------------+-----------+
| articulos|   id|             factura|      fecha_factura|cantidad|        precio_total|            articulo|date_string|
+----------+-----+--------------------+-------------------+--------+--------------------+--------------------+-----------+
|   MU08534| 2422|10000101000001000...|2019-05-02 00:00:00|     101|20099.00000000000...|MU08534          ...| 05/02/2019|
|     75555| 3185|10000101000001000...|2019-05-05 00:00:00|     116|92800.00000000000...|75555            ...| 05/05/2019|
|     75555| 5328|10000101000001000...|2019-05-05 00:00:00|     300|930000.0000000000...|75555            ...| 05/05/2019|
|   1712162|10785|10000101000001000...|2019-04-27 00:00:00|     121|20000.00000000000...|1712162          ...| 04/27/2019|
|   5060781|11153|10000101000001000...|2019-04-27 00:00:00|     121|20000.00000000000...|5060781          ...| 04/27/2019|
|1012451-12|1140

In [7]:
string_to_date = \
    udf(lambda text_date: datetime.strptime(text_date, '%m/%d/%Y'),
        DateType())

typed_df = formatted_df.withColumn("date", string_to_date(formatted_df.date_string))
typed_df.show()
typed_df.printSchema()

+----------+-----+--------------------+-------------------+--------+--------------------+--------------------+-----------+----------+
| articulos|   id|             factura|      fecha_factura|cantidad|        precio_total|            articulo|date_string|      date|
+----------+-----+--------------------+-------------------+--------+--------------------+--------------------+-----------+----------+
|   MU08534| 2422|10000101000001000...|2019-05-02 00:00:00|     101|20099.00000000000...|MU08534          ...| 05/02/2019|2019-05-02|
|     75555| 3185|10000101000001000...|2019-05-05 00:00:00|     116|92800.00000000000...|75555            ...| 05/05/2019|2019-05-05|
|     75555| 5328|10000101000001000...|2019-05-05 00:00:00|     300|930000.0000000000...|75555            ...| 05/05/2019|2019-05-05|
|   1712162|10785|10000101000001000...|2019-04-27 00:00:00|     121|20000.00000000000...|1712162          ...| 04/27/2019|2019-04-27|
|   5060781|11153|10000101000001000...|2019-04-27 00:00:00|   

In [8]:
sum_df = typed_df.groupBy("articulos", "date").sum()
sum_df.show()

+----------+----------+-------+--------------------+-------------+--------------------+
| articulos|      date|sum(id)|        sum(factura)|sum(cantidad)|   sum(precio_total)|
+----------+----------+-------+--------------------+-------------+--------------------+
|1012270-15|2019-02-14|   1501|20000202000001600...|          202|40800.00000000000...|
|   2M51119|2019-01-17|    990|10000101000000700...|          101|20033.00000000000...|
| F00006019|2019-02-16|   1971|10000101000000800...|          200|23434.00000000000...|
| 1005357HJ|2019-02-24| 209927|90000921000002400...|          911|182340.0000000000...|
|1012272-20|2019-05-09|   8000|20000202000002000...|          202|40964.00000000000...|
|1012272-15|2019-02-27|   2897|10000101000000800...|          101|20482.00000000000...|
|     42020|2019-03-13|   8258|20000202000001700...|          302|1730366.630000000...|
|1012270-12|2019-04-18|  79914|14000141400001400...|         1415|286267.0000000000...|
|     11111|2019-05-05|  23723|4

In [11]:
factura_df = \
    sum_df.select(
        col('articulos'),
        col('date'),
        col('sum(cantidad)').alias('Cantidad'))

factura_df.printSchema()
factura_df.show()

root
 |-- articulos: string (nullable = true)
 |-- date: date (nullable = true)
 |-- Cantidad: long (nullable = true)

+----------+----------+--------+
| articulos|      date|Cantidad|
+----------+----------+--------+
|1012270-15|2019-02-14|     202|
|   2M51119|2019-01-17|     101|
| F00006019|2019-02-16|     200|
| 1005357HJ|2019-02-24|     911|
|1012272-20|2019-05-09|     202|
|1012272-15|2019-02-27|     101|
|     42020|2019-03-13|     302|
|1012270-12|2019-04-18|    1415|
|     11111|2019-05-05|     424|
|1120300-23|2019-02-16|     101|
|     70010|2019-02-01|     103|
|     42000|2019-02-02|     361|
|     64016|2019-03-27|     220|
|1120350-28|2019-03-31|     101|
|    SPD-03|2019-04-24|     538|
|1010481-HJ|2019-05-26|     101|
|     75555|2019-05-31|    1720|
|   8722960|2019-05-31|     150|
|    SPD-03|2019-05-22|     103|
|1012274-12|2019-05-18|     101|
+----------+----------+--------+
only showing top 20 rows



In [12]:
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

articulo_df = spark \
    .read \
    .format("csv") \
    .option("path", "articulo.csv") \
    .option("header", True) \
    .schema(StructType([
                
                StructField("articulo", StringType()),
                StructField("descripcion", StringType())])) \
    .load()

articulo_df.printSchema()
articulo_df.show()

root
 |-- articulo: string (nullable = true)
 |-- descripcion: string (nullable = true)

+--------------------+--------------------+
|            articulo|         descripcion|
+--------------------+--------------------+
|             4002782|    Bolsa EVA 2000ml|
|  DYNJS3013(MUESTRA)|Paquete de Cirugí...|
| DYNJS2500H(MUESTRA)|Cobertor de mayo ...|
|          DYNJS2500H|Cobertor de mayo ...|
| DYNJP2500H(MUESTRA)|Cobertor de mayod...|
|          DYNJP2500H|Cobertor de mayod...|
|100/570/000(MUESTRA)|Filtro HME para t...|
|            8728852F|Jeringade 50 ml c...|
|            8728828F|Jeringa de 50ml c...|
|            8728801F|Jeringa de 50ml c...|
|             8728623|Jeringa de 20ml c...|
|          S20K11S07N|Transformador sec...|
|          DYNJP2002S|BATA QUIRURGICA X...|
|          DYNJP3020S|  KIT DE LAPAROTOMIA|
|          DYNJP6110S|      KIT DE CESAREA|
|              398580|Load Cell Arm Hol...|
|              141406|Mounting Plate RS...|
|             2025088|Column Co

In [13]:
joint_df = factura_df.join(articulo_df, factura_df.articulos == articulo_df.articulo)
joint_df.printSchema()
joint_df.show()

root
 |-- articulos: string (nullable = true)
 |-- date: date (nullable = true)
 |-- Cantidad: long (nullable = true)
 |-- articulo: string (nullable = true)
 |-- descripcion: string (nullable = true)

+----------+----------+--------+----------+--------------------+
| articulos|      date|Cantidad|  articulo|         descripcion|
+----------+----------+--------+----------+--------------------+
|1012270-15|2019-02-14|     202|1012270-15|Balon Coronario M...|
|   2M51119|2019-01-17|     101|   2M51119|       Centring Lugs|
| F00006019|2019-02-16|     200| F00006019|Multibic 2mmol/L ...|
| 1005357HJ|2019-02-24|     911| 1005357HJ|Guia para Angiopl...|
|1012272-20|2019-05-09|     202|1012272-20|Balon de angiopla...|
|1012272-15|2019-02-27|     101|1012272-15|Balon de angiopla...|
|     42020|2019-03-13|     302|     42020|Enterex Fresa  (p...|
|1012270-12|2019-04-18|    1415|1012270-12|Balon de angiopla...|
|     11111|2019-05-05|     424|     11111|Enterex Espesante...|
|1120300-23|2019-0