In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import types
from pyspark.sql.types import FloatType
from pyspark.sql import functions as F


In [2]:
output = 'gs://dl-eu-pub-tender/results'

In [3]:
credentials_location = '/home/valdas/.google/credentials/google_credentials.json'

# set spark configuration

conf = SparkConf() \
    .setMaster('local[*]') \
    .setAppName('test') \
    .set('spark.jars', '/home/valdas/lib/gcs-connector-hadoop3-2.2.5.jar') \
    .set('spark.hadoop.google.cloud.auth.service.account.enable', 'true') \
    .set('spark.hadoop.google.cloud.auth.service.account.json.keyfile', credentials_location) \
    .set("spark.sql.legacy.parquet.int96RebaseModeInRead", "CORRECTED") \
    .set("spark.sql.legacy.parquet.int96RebaseModeInWrite", "CORRECTED") \
    .set("spark.sql.legacy.parquet.datetimeRebaseModeInRead", "CORRECTED") \
    .set("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED") 

# set spark context
sc = SparkContext(conf=conf)

hadoop_conf = sc._jsc.hadoopConfiguration()

hadoop_conf.set('fs.AbstractFileSystem.gs.impl',  'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS')
hadoop_conf.set('fs.gs.impl', 'com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem')
hadoop_conf.set('fs.gs.auth.service.account.json.keyfile', credentials_location)
hadoop_conf.set('fs.gs.auth.service.account.enable', 'true')

# create spark session

spark = SparkSession.builder \
    .config(conf = sc.getConf()) \
    .getOrCreate()

22/04/15 10:20:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/04/15 10:20:23 WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.int96RebaseModeInWrite' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.int96RebaseModeInWrite' instead.
22/04/15 10:20:23 WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.datetimeRebaseModeInRead' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.datetimeRebaseModeInRead' instead.
22/04/15 10:20:23 WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.int96RebaseModeInRead' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.int96RebaseMod

In [4]:
schema = types.StructType([
    types.StructField ("tender_row_nr", types.IntegerType(), True),          
    types.StructField('tender_id', types.StringType(), True),
    types.StructField('tender_country', types.StringType(), True),
    types.StructField('tender_title', types.StringType(), True),
    types.StructField('tender_size', types.StringType(), True),
    types.StructField('tender_supplyType', types.StringType(), True),
    types.StructField('tender_procedureType', types.StringType(), True),
    types.StructField('tender_nationalProcedureType', types.StringType(), True),
    types.StructField('tender_mainCpv', types.StringType(), True),
    types.StructField('tender_cpvs', types.StringType(), True),
    types.StructField('tender_addressOfImplementation_nuts', types.StringType(), True),
    types.StructField('tender_year', types.TimestampType(), True),
    types.StructField('tender_eligibleBidLanguages', types.StringType(), True),
    types.StructField('tender_npwp_reasons', types.StringType(), True),
    types.StructField('tender_awardDeadline', types.TimestampType(), True),
    types.StructField('tender_contractSignatureDate', types.TimestampType(), True),
    types.StructField('tender_awardDecisionDate', types.TimestampType(), True),
    types.StructField('tender_bidDeadline', types.TimestampType(), True),
    types.StructField('tender_cancellationDate', types.TimestampType(), True),
    types.StructField('tender_estimatedStartDate', types.TimestampType(), True),
    types.StructField('tender_estimatedCompletionDate', types.TimestampType(), True),
    types.StructField('tender_estimatedDurationInYears', types.IntegerType(), True),
    types.StructField('tender_estimatedDurationInMonths', types.IntegerType(), True),
    types.StructField('tender_estimatedDurationInDays', types.IntegerType(), True),
    types.StructField('tender_isEUFunded', types.StringType(), True),
    types.StructField('tender_isDps', types.StringType(), True),
    types.StructField('tender_isElectronicAuction', types.StringType(), True),
    types.StructField('tender_isAwarded', types.StringType(), True),
    types.StructField('tender_isCentralProcurement', types.StringType(), True),
    types.StructField('tender_isJointProcurement', types.StringType(), True),
    types.StructField('tender_isOnBehalfOf', types.StringType(), True),
    types.StructField('tender_isFrameworkAgreement', types.StringType(), True),
    types.StructField('tender_isCoveredByGpa', types.StringType(), True),
    types.StructField('tender_hasLots', types.StringType(), True),
    types.StructField('tender_estimatedPrice', types.FloatType(), True),
    types.StructField('tender_estimatedPrice_currency', types.StringType(), True),
    types.StructField('tender_estimatedPrice_minNetAmount', types.FloatType(), True),
    types.StructField('tender_estimatedPrice_maxNetAmount', types.FloatType(), True),
    types.StructField('tender_estimatedPrice_EUR', types.FloatType(), True),
    types.StructField('tender_finalPrice', types.FloatType(), True),
    types.StructField('tender_finalPrice_currency', types.StringType(), True),
    types.StructField('tender_finalPrice_minNetAmount', types.FloatType(), True),
    types.StructField('tender_finalPrice_maxNetAmount', types.FloatType(), True),
    types.StructField('tender_finalPrice_EUR', types.FloatType(), True),
    types.StructField('tender_description_length', types.StringType(), True),
    types.StructField('tender_personalRequirements_length', types.StringType(), True),
    types.StructField('tender_economicRequirements_length', types.StringType(), True),
    types.StructField('tender_technicalRequirements_length', types.StringType(), True),
    types.StructField('tender_documents_count', types.IntegerType(), True),
    types.StructField('tender_awardCriteria_count', types.IntegerType(), True),
    types.StructField('tender_corrections_count', types.IntegerType(), True),
    types.StructField('tender_onBehalfOf_count', types.IntegerType(), True),
    types.StructField('tender_lots_count', types.IntegerType(), True),
    types.StructField('tender_publications_count', types.IntegerType(), True),
    types.StructField('tender_publications_firstCallForTenderDate', types.TimestampType(), True),
    types.StructField('tender_publications_lastCallForTenderDate', types.TimestampType(), True),
    types.StructField('tender_publications_firstdContractAwardDate', types.TimestampType(), True),
    types.StructField('tender_publications_lastContractAwardDate', types.TimestampType(), True),
    types.StructField('tender_publications_lastContractAwardUrl', types.StringType(), True),
    types.StructField('tender_indicator_INTEGRITY_SINGLE_BID', types.StringType(), True),
    types.StructField('tender_indicator_INTEGRITY_CALL_FOR_TENDER_PUBLICATION', types.StringType(), True),
    types.StructField('tender_indicator_INTEGRITY_ADVERTISEMENT_PERIOD', types.StringType(), True),
    types.StructField('tender_indicator_INTEGRITY_PROCEDURE_TYPE', types.StringType(), True),
    types.StructField('tender_indicator_INTEGRITY_DECISION_PERIOD', types.StringType(), True),
    types.StructField('tender_indicator_INTEGRITY_TAX_HAVEN', types.StringType(), True),
    types.StructField('tender_indicator_INTEGRITY_NEW_COMPANY', types.StringType(), True),
    types.StructField('tender_indicator_ADMINISTRATIVE_CENTRALIZED_PROCUREMENT', types.StringType(), True),
    types.StructField('tender_indicator_ADMINISTRATIVE_ELECTRONIC_AUCTION', types.StringType(), True),
    types.StructField('tender_indicator_ADMINISTRATIVE_COVERED_BY_GPA', types.StringType(), True),
    types.StructField('tender_indicator_ADMINISTRATIVE_FRAMEWORK_AGREEMENT', types.StringType(), True),
    types.StructField('tender_indicator_ADMINISTRATIVE_ENGLISH_AS_FOREIGN_LANGUAGE', types.StringType(), True),
    types.StructField('tender_indicator_ADMINISTRATIVE_NOTICE_AND_AWARD_DISCREPANCIES', types.StringType(), True),
    types.StructField('tender_indicator_TRANSPARENCY_NUMBER_OF_KEY_MISSING_FIELDS', types.StringType(), True),
    types.StructField('tender_indicator_TRANSPARENCY_AWARD_DATE_MISSING', types.StringType(), True),
    types.StructField('tender_indicator_TRANSPARENCY_BUYER_NAME_MISSING', types.StringType(), True),
    types.StructField('tender_indicator_TRANSPARENCY_PROC_METHOD_MISSING', types.StringType(), True),
    types.StructField('tender_indicator_TRANSPARENCY_BUYER_LOC_MISSING', types.StringType(), True),
    types.StructField('tender_indicator_TRANSPARENCY_BIDDER_ID_MISSING', types.StringType(), True),
    types.StructField('tender_indicator_TRANSPARENCY_BIDDER_NAME_MISSING', types.StringType(), True),
    types.StructField('tender_indicator_TRANSPARENCY_MARKET_MISSING', types.StringType(), True),
    types.StructField('tender_indicator_TRANSPARENCY_TITLE_MISSING', types.StringType(), True),
    types.StructField('tender_indicator_TRANSPARENCY_VALUE_MISSING', types.StringType(), True),
    types.StructField('tender_indicator_TRANSPARENCY_YEAR_MISSING', types.StringType(), True),
    types.StructField('tender_indicator_INTEGRITY_WINNER_CA_SHARE', types.StringType(), True),
    types.StructField('tender_indicator_TRANSPARENCY_MISSING_ADDRESS_OF_IMPLEMENTATION_NUTS', types.StringType(), True),
    types.StructField('tender_indicator_TRANSPARENCY_MISSING_ELIGIBLE_BID_LANGUAGES', types.StringType(), True),
    types.StructField('tender_indicator_TRANSPARENCY_MISSING_OR_INCOMPLETE_AWARD_CRITERIA', types.StringType(), True),
    types.StructField('tender_indicator_TRANSPARENCY_MISSING_OR_INCOMPLETE_CPVS', types.StringType(), True),
    types.StructField('tender_indicator_TRANSPARENCY_MISSING_OR_INCOMPLETE_DURATION_INFO', types.StringType(), True),
    types.StructField('tender_indicator_TRANSPARENCY_MISSING_OR_INCOMPLETE_FUNDINGS_INFO', types.StringType(), True),
    types.StructField('tender_indicator_TRANSPARENCY_MISSING_SELECTION_METHOD', types.StringType(), True),
    types.StructField('tender_indicator_TRANSPARENCY_MISSING_SUBCONTRACTED_INFO', types.StringType(), True),
    types.StructField('buyer_row_nr', types.StringType(), True),
    types.StructField('buyer_buyerType', types.StringType(), True),
    types.StructField('buyer_mainActivities', types.StringType(), True),
    types.StructField('buyer_id', types.StringType(), True),
    types.StructField('buyer_name', types.StringType(), True),
    types.StructField('buyer_nuts', types.StringType(), True),
    types.StructField('buyer_city', types.StringType(), True),
    types.StructField('buyer_country', types.StringType(), True),
    types.StructField('buyer_postcode', types.StringType(), True),
    types.StructField('lot_row_nr', types.StringType(), True),
    types.StructField('lot_title', types.StringType(), True),
    types.StructField('lot_selectionMethod', types.StringType(), True),
    types.StructField('lot_status', types.StringType(), True),
    types.StructField('lot_contractSignatureDate', types.StringType(), True),
    types.StructField('lot_cancellationDate', types.StringType(), True),
    types.StructField('lot_isAwarded', types.StringType(), True),
    types.StructField('lot_estimatedPrice', types.FloatType(), True),
    types.StructField('lot_estimatedPrice_currency', types.StringType(), True),
    types.StructField('lot_estimatedPrice_minNetAmount', types.FloatType(), True),
    types.StructField('lot_estimatedPrice_maxNetAmount', types.FloatType(), True),
    types.StructField('lot_estimatedPrice_EUR', types.FloatType(), True),
    types.StructField('lot_lotNumber', types.StringType(), True),
    types.StructField('lot_bidsCount', types.IntegerType(), True),
    types.StructField('lot_validBidsCount', types.IntegerType(), True),
    types.StructField('lot_smeBidsCount', types.IntegerType(), True),
    types.StructField('lot_electronicBidsCount', types.IntegerType(), True),
    types.StructField('lot_nonEuMemberStatesCompaniesBidsCount', types.IntegerType(), True),
    types.StructField('lot_otherEuMemberStatesCompaniesBidsCount', types.IntegerType(), True),
    types.StructField('lot_foreignCompaniesBidsCount', types.IntegerType(), True),
    types.StructField('lot_description_length', types.StringType(), True),
    types.StructField('bid_row_nr', types.StringType(), True),
    types.StructField('bid_isWinning', types.StringType(), True),
    types.StructField('bid_isSubcontracted', types.StringType(), True),
    types.StructField('bid_isConsortium', types.StringType(), True),
    types.StructField('bid_price', types.FloatType(), True),
    types.StructField('bid_price_currency', types.StringType(), True),
    types.StructField('bid_price_minNetAmount', FloatType(), True),
    types.StructField('bid_price_maxNetAmount', FloatType(), True),
    types.StructField('bid_price_EUR', types.FloatType(), True),
    types.StructField('bidder_row_nr', types.StringType(), True),
    types.StructField('bidder_id', types.StringType(), True),
    types.StructField('bidder_name', types.StringType(), True),
    types.StructField('bidder_nuts', types.StringType(), True),
    types.StructField('bidder_city', types.StringType(), True),
    types.StructField('bidder_country', types.StringType(), True),
    types.StructField('bidder_postcode', types.StringType(), True)
])


In [5]:
df = spark.read \
    .option('header', 'true') \
    .option("delimiter", ";") \
    .schema(schema) \
    .csv('gs://dl-eu-pub-tender/raw_data/country_data/*')
   

22/04/15 10:20:43 WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.int96RebaseModeInWrite' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.int96RebaseModeInWrite' instead.
22/04/15 10:20:43 WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.datetimeRebaseModeInRead' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.datetimeRebaseModeInRead' instead.
22/04/15 10:20:43 WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.int96RebaseModeInRead' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.int96RebaseModeInRead' instead.
22/04/15 10:20:43 WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.datetimeRebaseModeInWrite' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.datetimeRebaseModeInWrite' instead.


In [6]:
def  boolean_conversion(base_str):
    if base_str == 'yes':
        return True
    elif base_str == 'no':
        return False
    else:
        return f'unknown'

boolean_conversion_udf = F.udf(boolean_conversion, returnType=types.BooleanType())

In [7]:
df = df \
    .withColumn('tender_date', F.to_date(df.tender_awardDecisionDate, 'yyyy-mm-dd')) \
    .withColumn('tender_year', F.year(df.tender_year)) \
    .withColumnRenamed('tender_supplyType', 'purchase_type') \
    .withColumnRenamed('tender_procedureType', 'procedure_type') \
    .withColumnRenamed('tender_finalPrice_EUR', 'final_price') \
    .withColumnRenamed('tender_awardCriteria_count', 'award_criteria_count') \
    .withColumnRenamed('buyer_buyerType', 'buyer_type') \
    .withColumnRenamed('buyer_mainActivities', 'buyers_activities') \
    .withColumn('eu_funded', boolean_conversion_udf(df.tender_isEUFunded)) \
    .select('tender_id', 'eu_funded', 'tender_year', 'tender_date', 'tender_country','buyer_name', 'buyer_type', 'buyers_activities', 'purchase_type', 'procedure_type','award_criteria_count', 'bidder_name', 'final_price' )

22/04/15 10:21:12 WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.int96RebaseModeInWrite' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.int96RebaseModeInWrite' instead.
22/04/15 10:21:12 WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.datetimeRebaseModeInRead' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.datetimeRebaseModeInRead' instead.
22/04/15 10:21:12 WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.int96RebaseModeInRead' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.int96RebaseModeInRead' instead.
22/04/15 10:21:12 WARN SQLConf: The SQL config 'spark.sql.legacy.parquet.datetimeRebaseModeInWrite' has been deprecated in Spark v3.2 and may be removed in the future. Use 'spark.sql.parquet.datetimeRebaseModeInWrite' instead.


In [8]:
df.repartition(48)

DataFrame[tender_id: string, eu_funded: boolean, tender_year: int, tender_date: date, tender_country: string, buyer_name: string, buyer_type: string, buyers_activities: string, purchase_type: string, procedure_type: string, award_criteria_count: int, bidder_name: string, final_price: float]

In [9]:
df.write.parquet('gs://dl-eu-pub-tender/raw_data/parquet', mode = 'overwrite')

                                                                                

In [44]:
dfPar = spark.read.parquet('gs://dl-eu-pub-tender/raw_data/parquet')

In [45]:
dfPar.createOrReplaceTempView('data')

In [10]:
# Show 15 largest public tender suppliers in EU
df_largest_suppliers = spark.sql("""
SELECT bidder_name, sum(final_price) AS value
FROM data
WHERE bidder_name IS NOT NULL
GROUP BY bidder_name
ORDER BY sum(final_price) DESC
LIMIT 15;
""")



+--------------------+--------------------+
|         bidder_name|               value|
+--------------------+--------------------+
|BG TEVA GmbH, rat...|         2.092538E12|
|   ALIUD PHARMA GmbH|    9.82911731748E11|
|     1 A Pharma GmbH|    7.82850600616E11|
|  Aristo Pharma GmbH|     5.4316061316E11|
|BG Heumann Pharma...|          4.37472E11|
|BG PUREN Pharma G...|          4.37472E11|
|            Hexal AG|    3.92045495996E11|
|    GALENpharma GmbH|    3.89219745296E11|
|neuraxpharm Arzne...|    3.42314004356E11|
|BG Zentiva Pharma...|          2.91648E11|
|Glenmark Arzneimi...|    2.01546874736E11|
|         Basics GmbH|1.963134267834375E11|
|     AAA-Pharma GmbH|    1.47546933764E11|
|         TEVA ITALIA|     1.1216921136E11|
|AIESI HOSPITAL SE...|    1.11437296291E11|
+--------------------+--------------------+



                                                                                

In [11]:
# Show 25 sectors, where EU spends most
df_largest_sectors = spark.sql("""
SELECT buyers_activities, sum(final_price) AS value
FROM data
WHERE buyers_activities IS NOT NULL
GROUP BY buyers_activities
ORDER BY sum(final_price) DESC
LIMIT 25;
""").show()



+--------------------+--------------------+
|   buyers_activities|               value|
+--------------------+--------------------+
|              HEALTH|1.392437537436630...|
|HEALTH,OTHER,GENE...|1.182682029617625...|
|GENERAL_PUBLIC_SE...|1.678776219599417...|
|               OTHER|1.183263491195017...|
|        HEALTH,OTHER|8.420023862121294E11|
|OTHER,GENERAL_PUB...|8.092877406141925E11|
|GENERAL_PUBLIC_SE...| 6.22540863347945E11|
|HEALTH,GENERAL_PU...|5.969822360742549E11|
|GENERAL_PUBLIC_SE...|5.254400545768906E11|
|OTHER,ENVIRONMENT...|2.489289820332660...|
|GENERAL_PUBLIC_SE...|2.445128217288515...|
|           EDUCATION|2.358245051329227...|
|             DEFENCE|1.792321960181112...|
|HOUSING_AND_COMMU...|1.580622228752117...|
|HOUSING_AND_COMMU...|1.500738696063661E11|
|HEALTH,GENERAL_PU...|1.265497186235592...|
|PUBLIC_ORDER_AND_...|1.208172822787124...|
|         ELECTRICITY|1.135620294280568...|
|HEALTH,OTHER,HOUS...|1.077169679170937...|
|GENERAL_PUBLIC_SE...|1.05089727

                                                                                

In [12]:
# Show purchases by procedure type
df_procedures = spark.sql("""
SELECT procedure_type, sum(final_price) AS amount
FROM data
GROUP BY procedure_type
ORDER BY sum(final_price) DESC
LIMIT 12;
""").show()



+--------------------+--------------------+
|      procedure_type|              amount|
+--------------------+--------------------+
|                OPEN|3.703694820816669E13|
|          RESTRICTED|9.072365150053544E11|
|NEGOTIATED_WITH_P...|8.352770879332925E11|
|NEGOTIATED_WITHOU...|4.951850850196092E11|
|          NEGOTIATED|3.547498204683302E11|
|  COMPETITIVE_DIALOG|9.077439841323251E10|
|                null| 5.01239254870525E10|
| APPROACHING_BIDDERS|3.703755632541736E10|
|INOVATION_PARTNER...|1.489559890273312...|
|               OTHER| 9.242293528452265E9|
|      OUTRIGHT_AWARD|3.8540221732375107E9|
|          MINITENDER|1.1392184837360263E9|
|   Lot 4 - DRDP Iasi|      4.5520667875E8|
| Lotto n. 5: 0600...|         4.9800076E7|
|      PUBLIC_CONTEST|         4.3445801E7|
| 4.Обособена пози...|          1.893635E7|
| Construire centr...|         1.8848958E7|
| Обособена позици...|          1.531716E7|
| Обособена позици...|         1.4640822E7|
| ЛОТ 4- райони Бя...|          

                                                                                

In [16]:
# Show tender values by country over years
df_values_by_country = spark.sql("""
SELECT tender_country, tender_year, sum(final_price) AS value
FROM data
GROUP BY tender_country, tender_year
ORDER BY sum(final_price) DESC;
""").show()



+--------------+-----------+--------------------+
|tender_country|tender_year|               value|
+--------------+-----------+--------------------+
|            IT|       2016|1.228789393257906...|
|            DE|       2016|9.886196574122207E12|
|            FR|       2017|8.020394195745898E11|
|            FR|       2019|7.826629803352505E11|
|            FR|       2018|6.614133934468223E11|
|            FR|       2016|5.684921082847106E11|
|            LT|       2015|4.549899950147812...|
|            PL|       2019|4.316972988045312...|
|            IT|       2018| 3.95641994436104E11|
|            PL|       2018|3.528840293357969E11|
|            ES|       2018|3.105747164751419E11|
|            FR|       2020|3.005226664332304...|
|            IT|       2019|2.953583413658703...|
|            IT|       2015|2.953137557538583...|
|            PL|       2017|2.803455343799687...|
|            RO|       2017|2.754602780603906E11|
|            IT|       2017|2.442633661429955...|


                                                                                

In [17]:
# Show largest public tender ever
df_largest_tender = spark.sql("""
SELECT tender_date, tender_year, buyer_name, buyers_activities, bidder_name, final_price
FROM data
ORDER BY final_price DESC
LIMIT 1;
""").show()



+-----------+-----------+--------------------+--------------------+-----------+-------------+
|tender_date|tender_year|          buyer_name|   buyers_activities|bidder_name|  final_price|
+-----------+-----------+--------------------+--------------------+-----------+-------------+
|       null|       2016|Azienda Ospedalie...|HEALTH,OTHER,GENE...|     Abbvie|1.09164569E11|
+-----------+-----------+--------------------+--------------------+-----------+-------------+



                                                                                

In [30]:
# Show how largest buyer revenues develop over years
df_largest_bidders_revenues = spark.sql("""
SELECT bidder_name, tender_year, sum(final_price) AS revenue
FROM data
GROUP BY bidder_name, tender_year
ORDER BY sum(final_price) DESC
LIMIT 15;
""")

In [31]:
df_largest_bidders_revenues.head()

[Stage 27:>                                                       (4 + 4) / 330]

AttributeError: 'NoneType' object has no attribute 'coalesce'

In [66]:
# Show largest public tender ever
df_largest_tender = spark.sql("""
SELECT  sum(final_price) AS revenue, bidder_name, tender_year
FROM data
WHERE (
tender_year IS NOT NULL
and
bidder_name is not null)
GROUP BY bidder_name, tender_year
ORDER BY revenue DESC
LIMIT 50;
""").show(150)

22/04/15 12:35:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/15 12:35:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/15 12:35:08 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/15 12:35:15 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.

+------------------+--------------------+-----------+
|           revenue|         bidder_name|tender_year|
+------------------+--------------------+-----------+
|       2.092538E12|BG TEVA GmbH, rat...|       2016|
|  9.76212612048E11|   ALIUD PHARMA GmbH|       2016|
|  7.82850600616E11|     1 A Pharma GmbH|       2016|
|  5.36139022864E11|  Aristo Pharma GmbH|       2016|
|        4.37472E11|BG PUREN Pharma G...|       2016|
|        4.37472E11|BG Heumann Pharma...|       2016|
|  3.92045016664E11|            Hexal AG|       2016|
|  3.89217446296E11|    GALENpharma GmbH|       2016|
|  3.40732359728E11|neuraxpharm Arzne...|       2016|
|        2.91648E11|BG Zentiva Pharma...|       2016|
|  1.99337815432E11|Glenmark Arzneimi...|       2016|
|  1.95386018456E11|         Basics GmbH|       2016|
|  1.46184343296E11|     AAA-Pharma GmbH|       2016|
|1.0962023711875E11|              Abbvie|       2016|
|  1.09410950164E11|          MSD ITALIA|       2016|
|  1.09392155698E11|        



In [70]:
df_c = spark.sql("""
WITH largest_bidders As
(SELECT tender_country, sum(final_price)
FROM data
GROUP BY tender_country)

SELECT tender_year, data.tender_country, sum(final_price)
from data
join largest_bidders
on largest_bidders.tender_country = data.tender_country
where tender_year is not null
Group by tender_year, data.tender_country;
""")

[Stage 71:>                                                         (0 + 1) / 1]

+-----------+--------------+--------------------+
|tender_year|tender_country|    sum(final_price)|
+-----------+--------------+--------------------+
|       2017|            RO|2.754602780603906E11|
|       2018|            RO|1.138871757598503...|
|       2019|            RO|2.542762587666996E10|
|       2020|            RO|1.321920690842858...|
|       2016|            RO| 1.61881352194375E11|
|       2015|            RO|  1.9814936524575E11|
|       2013|            RO|1.540934407952031...|
|       2010|            RO|3.237844187124218...|
|       2009|            RO|1.631532532364453...|
|       2012|            RO|1.875654783468828E11|
|       2011|            RO|1.184161238118593...|
|       2014|            RO|2.255745434481718...|
|       2020|            FI|2.586965495451562...|
|       2016|            LT|1.061778788957968...|
|       2017|            LT|2.705848238554687...|
|       2014|            LT|1.121908423859531...|
|       2009|            LT|  1.54997946221875E9|


                                                                                

In [75]:
df_b = spark.sql("""
WITH largest_bidders As
(SELECT bidder_name, sum(final_price)
FROM data
GROUP BY bidder_name
order by sum(final_price) desc
limit 15)

SELECT tender_year, data.bidder_name, sum(final_price)
from data
join largest_bidders
on largest_bidders.bidder_name = data.bidder_name
where (data.tender_year is not null
and
data.final_price is not null)
Group by tender_year, data.bidder_name;
""").show(50)

22/04/15 12:51:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/15 12:51:35 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
22/04/15 12:51:39 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
                                                                                

+-----------+--------------------+----------------+
|tender_year|         bidder_name|sum(final_price)|
+-----------+--------------------+----------------+
|       2016|         TEVA ITALIA|1.09164568576E11|
|       2015|         TEVA ITALIA|   3.004642784E9|
|       2009|            Hexal AG|        270332.0|
|       2018|     AAA-Pharma GmbH|       2652000.0|
|       2018|   ALIUD PHARMA GmbH|        3.4476E7|
|       2020|     AAA-Pharma GmbH|    1.05563306E9|
|       2019|  Aristo Pharma GmbH|   4.009599008E9|
|       2018|neuraxpharm Arzne...|        1.5912E7|
|       2020|  Aristo Pharma GmbH|   2.961422288E9|
|       2020|Glenmark Arzneimi...|    8.47801968E8|
|       2020|   ALIUD PHARMA GmbH|   2.753591396E9|
|       2019|Glenmark Arzneimi...|   1.354360336E9|
|       2018|         Basics GmbH|         1.326E7|
|       2020|neuraxpharm Arzne...|    2.17247292E8|
|       2020|         Basics GmbH|    4.27431984E8|
|       2019|     AAA-Pharma GmbH|    2.97408408E8|
|       2019