Skip to content

Commit

Permalink
Merge branch 'main' into COST-4391-aws-units-consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
bacciotti committed Dec 12, 2023
2 parents 0315929 + 9999949 commit da6a3e4
Show file tree
Hide file tree
Showing 18 changed files with 143 additions and 121 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Generated by Django 3.2.22 on 2023-12-05 14:38
from django.db import migrations


class Migration(migrations.Migration):

dependencies = [
("api", "0060_provider_polling_timestamp"),
]

operations = [
migrations.AlterUniqueTogether(
name="providerinfrastructuremap",
unique_together={("infrastructure_type", "infrastructure_provider")},
),
]
3 changes: 3 additions & 0 deletions koku/api/provider/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,5 +568,8 @@ class ProviderInfrastructureMap(models.Model):
associated provider the cluster is installed on.
"""

class Meta:
unique_together = ("infrastructure_type", "infrastructure_provider")

infrastructure_type = models.CharField(max_length=50, choices=Provider.CLOUD_PROVIDER_CHOICES, blank=False)
infrastructure_provider = models.ForeignKey("Provider", on_delete=models.CASCADE)
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@

WITH cte_azure_instances AS (
SELECT DISTINCT split_part(coalesce(azure.resourceid, azure.instanceid), '/', 9) as instance,
SELECT DISTINCT split_part(coalesce(nullif(azure.resourceid, ''), azure.instanceid), '/', 9) as instance,
azure.source
FROM hive.{{schema | sqlsafe}}.azure_line_items AS azure
WHERE coalesce(azure.date, azure.usagedatetime) >= {{start_date}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,17 @@ INSERT INTO postgres.{{schema | sqlsafe}}.reporting_azurecostentrylineitem_daily
WITH cte_line_items AS (
SELECT date(coalesce(date, usagedatetime)) as usage_date,
INTEGER '{{bill_id | sqlsafe}}' as cost_entry_bill_id,
coalesce(subscriptionid, subscriptionguid) as subscription_guid,
coalesce(nullif(subscriptionid, ''), subscriptionguid) as subscription_guid,
resourcelocation as resource_location,
coalesce(servicename, metercategory) as service_name,
coalesce(nullif(servicename, ''), metercategory) as service_name,
json_extract_scalar(json_parse(additionalinfo), '$.ServiceType') as instance_type,
cast(coalesce(quantity, usagequantity) as DECIMAL(24,9)) as usage_quantity,
cast(coalesce(costinbillingcurrency, pretaxcost) as DECIMAL(24,9)) as pretax_cost,
coalesce(billingcurrencycode, currency, billingcurrency) as currency,
cast(coalesce(nullif(quantity, 0), usagequantity) as DECIMAL(24,9)) as usage_quantity,
cast(coalesce(nullif(costinbillingcurrency, 0), pretaxcost) as DECIMAL(24,9)) as pretax_cost,
coalesce(nullif(billingcurrencycode, ''), nullif(currency, ''), billingcurrency) as currency,
json_parse(tags) as tags,
coalesce(resourceid, instanceid) as instance_id,
coalesce(nullif(resourceid, ''), instanceid) as instance_id,
cast(source as UUID) as source_uuid,
coalesce(subscriptionname, subscriptionid, subscriptionguid) as subscription_name,
coalesce(nullif(subscriptionname, ''), nullif(subscriptionid, ''), subscriptionguid) as subscription_name,
CASE
WHEN regexp_like(split_part(unitofmeasure, ' ', 1), '^\d+(\.\d+)?$') AND NOT (unitofmeasure = '100 Hours' AND metercategory='Virtual Machines') AND NOT split_part(unitofmeasure, ' ', 2) = ''
THEN cast(split_part(unitofmeasure, ' ', 1) as INTEGER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ INSERT INTO hive.{{schema | sqlsafe}}.azure_openshift_daily_resource_matched_tem
)
SELECT cast(uuid() as varchar) as uuid,
coalesce(azure.date, azure.usagedatetime) as usage_start,
split_part(coalesce(resourceid, instanceid), '/', 9) as resource_id,
coalesce(servicename, metercategory) as service_name,
split_part(coalesce(nullif(resourceid, ''), instanceid), '/', 9) as resource_id,
coalesce(nullif(servicename, ''), metercategory) as service_name,
max(json_extract_scalar(json_parse(azure.additionalinfo), '$.ServiceType')) as instance_type,
coalesce(azure.subscriptionid, azure.subscriptionguid) as subscription_guid,
coalesce(nullif(azure.subscriptionid, ''), azure.subscriptionguid) as subscription_guid,
azure.resourcelocation as resource_location,
max(CASE
WHEN split_part(unitofmeasure, ' ', 2) = 'Hours'
Expand All @@ -163,9 +163,9 @@ SELECT cast(uuid() as varchar) as uuid,
THEN split_part(unitofmeasure, ' ', 2)
ELSE unitofmeasure
END) as unit_of_measure,
sum(coalesce(azure.quantity, azure.usagequantity)) as usage_quantity,
coalesce(azure.billingcurrencycode, azure.currency) as currency,
sum(coalesce(azure.costinbillingcurrency, azure.pretaxcost)) as pretax_cost,
sum(coalesce(nullif(azure.quantity, 0), azure.usagequantity)) as usage_quantity,
coalesce(nullif(azure.billingcurrencycode, ''), azure.currency) as currency,
sum(coalesce(nullif(azure.costinbillingcurrency, 0), azure.pretaxcost)) as pretax_cost,
azure.tags,
max(azure.resource_id_matched) as resource_id_matched,
{{ocp_source_uuid}} as ocp_source,
Expand All @@ -179,11 +179,11 @@ WHERE azure.source = {{azure_source_uuid}}
AND coalesce(azure.date, azure.usagedatetime) < date_add('day', 1, {{end_date}})
AND azure.resource_id_matched = TRUE
GROUP BY coalesce(azure.date, azure.usagedatetime),
split_part(coalesce(resourceid, instanceid), '/', 9),
coalesce(servicename, metercategory),
coalesce(subscriptionid, subscriptionguid),
split_part(coalesce(nullif(resourceid, ''), instanceid), '/', 9),
coalesce(nullif(servicename, ''), metercategory),
coalesce(nullif(subscriptionid, ''), subscriptionguid),
azure.resourcelocation,
coalesce(azure.billingcurrencycode, azure.currency),
coalesce(nullif(azure.billingcurrencycode, ''), azure.currency),
azure.tags
;

Expand Down Expand Up @@ -219,9 +219,9 @@ WITH cte_enabled_tag_keys AS (
SELECT cast(uuid() as varchar) as uuid,
coalesce(azure.date, azure.usagedatetime) as usage_start,
split_part(coalesce(resourceid, instanceid), '/', 9) as resource_id,
coalesce(servicename, metercategory) as service_name,
coalesce(nullif(servicename, ''), metercategory) as service_name,
max(json_extract_scalar(json_parse(azure.additionalinfo), '$.ServiceType')) as instance_type,
coalesce(azure.subscriptionid, azure.subscriptionguid) as subscription_guid,
coalesce(nullif(azure.subscriptionid, ''), azure.subscriptionguid) as subscription_guid,
azure.resourcelocation as resource_location,
max(CASE
WHEN split_part(unitofmeasure, ' ', 2) = 'Hours'
Expand All @@ -232,9 +232,9 @@ SELECT cast(uuid() as varchar) as uuid,
THEN split_part(unitofmeasure, ' ', 2)
ELSE unitofmeasure
END) as unit_of_measure,
sum(coalesce(azure.quantity, azure.usagequantity)) as usage_quantity,
coalesce(azure.billingcurrencycode, azure.currency) as currency,
sum(coalesce(azure.costinbillingcurrency, azure.pretaxcost)) as pretax_cost,
sum(coalesce(nullif(azure.quantity, 0), azure.usagequantity)) as usage_quantity,
coalesce(nullif(azure.billingcurrencycode, ''), azure.currency) as currency,
sum(coalesce(nullif(azure.costinbillingcurrency, 0), azure.pretaxcost)) as pretax_cost,
json_format(
cast(
map_filter(
Expand All @@ -257,10 +257,10 @@ WHERE azure.source = {{azure_source_uuid}}
AND (azure.resource_id_matched = FALSE OR azure.resource_id_matched IS NULL)
GROUP BY coalesce(azure.date, azure.usagedatetime),
split_part(coalesce(resourceid, instanceid), '/', 9),
coalesce(servicename, metercategory),
coalesce(subscriptionid, subscriptionguid),
coalesce(nullif(servicename, ''), metercategory),
coalesce(nullif(subscriptionid, ''), subscriptionguid),
azure.resourcelocation,
coalesce(azure.billingcurrencycode, azure.currency),
coalesce(nullif(azure.billingcurrencycode, ''), azure.currency),
12, -- tags
azure.matched_tag
;
Expand Down
2 changes: 1 addition & 1 deletion koku/masu/external/downloader/aws/aws_report_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def create_daily_archives(
local_file,
chunksize=settings.PARQUET_PROCESSING_BATCH_SIZE,
usecols=lambda x: x in use_cols,
dtype="str",
dtype=pd.StringDtype(storage="pyarrow"),
) as reader:
for i, data_frame in enumerate(reader):
if data_frame.empty:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,10 @@ def create_daily_archives(
{"UsageDateTime", "Date", "date", "usagedatetime"}
)[0]
with pd.read_csv(
local_file, chunksize=settings.PARQUET_PROCESSING_BATCH_SIZE, parse_dates=[time_interval], dtype="str"
local_file,
chunksize=settings.PARQUET_PROCESSING_BATCH_SIZE,
parse_dates=[time_interval],
dtype=pd.StringDtype(storage="pyarrow"),
) as reader:
for i, data_frame in enumerate(reader):
if data_frame.empty:
Expand Down
2 changes: 1 addition & 1 deletion koku/masu/external/downloader/gcp/gcp_report_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class GCPReportDownloaderError(Exception):

def pd_read_csv(local_file_path):
try:
return pd.read_csv(local_file_path, dtype="str")
return pd.read_csv(local_file_path, dtype=pd.StringDtype(storage="pyarrow"))
except Exception as error:
LOG.error(log_json(msg="file could not be parsed", file_path=local_file_path), exc_info=error)
raise GCPReportDownloaderError(error)
Expand Down
2 changes: 1 addition & 1 deletion koku/masu/external/downloader/oci/oci_report_downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def divide_csv_monthly(file_path, filename):
directory = os.path.dirname(file_path)

try:
data_frame = pd.read_csv(file_path, dtype="str")
data_frame = pd.read_csv(file_path, dtype=pd.StringDtype(storage="pyarrow"))
except Exception as error:
LOG.error(f"File {file_path} could not be parsed. Reason: {error}")
raise error
Expand Down
2 changes: 1 addition & 1 deletion koku/masu/external/kafka_msg_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def divide_csv_daily(file_path: os.PathLike, manifest_id: int):
daily_files = []

try:
data_frame = pd.read_csv(file_path, dtype="str")
data_frame = pd.read_csv(file_path, dtype=pd.StringDtype(storage="pyarrow"))
except Exception as error:
LOG.error(f"File {file_path} could not be parsed. Reason: {str(error)}")
raise error
Expand Down
5 changes: 2 additions & 3 deletions koku/masu/test/util/aws/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
from uuid import uuid4

import boto3
import numpy as np
import pandas as pd
from botocore.exceptions import ClientError
from dateutil.relativedelta import relativedelta
Expand Down Expand Up @@ -755,7 +754,7 @@ def test_match_openshift_labels_with_nan_resources(self):

matched_tags = [{"key": "value"}]
data = [
{"lineitem_resourceid": np.nan, "lineitem_unblendedcost": 1, "resourcetags": '{"key": "value"}'},
{"lineitem_resourceid": "", "lineitem_unblendedcost": 1, "resourcetags": '{"key": "value"}'},
]

df = pd.DataFrame(data)
Expand All @@ -779,7 +778,7 @@ def test_match_openshift_resource_with_nan_labels(self):

matched_tags = [{"key": "value"}]
data = [
{"lineitem_resourceid": "id1", "lineitem_unblendedcost": 1, "resourcetags": np.nan},
{"lineitem_resourceid": "id1", "lineitem_unblendedcost": 1, "resourcetags": ""},
]

df = pd.DataFrame(data)
Expand Down
6 changes: 3 additions & 3 deletions koku/masu/test/util/azure/test_azure_post_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from masu.util.azure.azure_post_processor import AzurePostProcessor
from masu.util.azure.common import INGRESS_REQUIRED_COLUMNS
from reporting.provider.all.models import EnabledTagKeys
from reporting.provider.azure.models import TRINO_COLUMNS
from reporting.provider.azure.models import TRINO_REQUIRED_COLUMNS


class TestAzurePostProcessor(MasuTestCase):
Expand Down Expand Up @@ -44,9 +44,9 @@ def test_azure_process_dataframe(self):
result, _ = self.post_processor.process_dataframe(df)
columns = list(result)
expected_columns = sorted(
col.replace("-", "_").replace("/", "_").replace(":", "_").lower() for col in TRINO_COLUMNS
col.replace("-", "_").replace("/", "_").replace(":", "_").lower() for col in TRINO_REQUIRED_COLUMNS
)
self.assertEqual(columns, expected_columns)
self.assertEqual(sorted(columns), sorted(expected_columns))

def test_azure_date_converter(self):
"""Test that we convert the new Azure date format."""
Expand Down
9 changes: 4 additions & 5 deletions koku/masu/test/util/azure/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
# Copyright 2021 Red Hat Inc.
# SPDX-License-Identifier: Apache-2.0
#
import numpy as np
import pandas as pd
from django_tenants.utils import schema_context

Expand Down Expand Up @@ -116,9 +115,9 @@ def test_match_openshift_resources_and_labels_resource_nan(self):
]
matched_tags = []
data = [
{"resourceid": np.nan, "instanceid": "id1", "pretaxcost": 1, "tags": '{"key": "value"}'},
{"resourceid": np.nan, "instanceid": "id2", "pretaxcost": 1, "tags": '{"key": "other_value"}'},
{"resourceid": np.nan, "instanceid": "id3", "pretaxcost": 1, "tags": '{"keyz": "value"}'},
{"resourceid": "", "instanceid": "id1", "pretaxcost": 1, "tags": '{"key": "value"}'},
{"resourceid": "", "instanceid": "id2", "pretaxcost": 1, "tags": '{"key": "other_value"}'},
{"resourceid": "", "instanceid": "id3", "pretaxcost": 1, "tags": '{"keyz": "value"}'},
]

df = pd.DataFrame(data)
Expand Down Expand Up @@ -148,7 +147,7 @@ def test_match_openshift_resource_with_nan_labels(self):

matched_tags = [{"key": "value"}]
data = [
{"resourceid": "id1", "pretaxcost": 1, "tags": np.nan},
{"resourceid": "id1", "pretaxcost": 1, "tags": ""},
]

df = pd.DataFrame(data)
Expand Down
6 changes: 3 additions & 3 deletions koku/masu/util/aws/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,14 +880,14 @@ def match_openshift_resources_and_labels(data_frame, cluster_topologies, matched
resource_ids = tuple(resource_ids)
data_frame["resource_id_matched"] = False
resource_id_df = data_frame["lineitem_resourceid"]
if not resource_id_df.isna().values.all():
if not resource_id_df.eq("").all():
LOG.info("Matching OpenShift on AWS by resource ID.")
resource_id_matched = resource_id_df.str.endswith(resource_ids)
data_frame["resource_id_matched"] = resource_id_matched

data_frame["special_case_tag_matched"] = False
tags = data_frame["resourcetags"]
if not tags.isna().values.all():
if not tags.eq("").all():
tags = tags.str.lower()
LOG.info("Matching OpenShift on AWS by tags.")
special_case_tag_matched = tags.str.contains(
Expand All @@ -903,7 +903,7 @@ def match_openshift_resources_and_labels(data_frame, cluster_topologies, matched
tag_values.extend(list(tag.values()))

any_tag_matched = None
if not tags.isna().values.all():
if not tags.eq("").all():
tag_matched = tags.str.contains("|".join(tag_keys)) & tags.str.contains("|".join(tag_values))
data_frame["tag_matched"] = tag_matched
any_tag_matched = tag_matched.any()
Expand Down
13 changes: 7 additions & 6 deletions koku/masu/util/azure/azure_post_processor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging

import ciso8601
import pandas
Expand All @@ -10,7 +11,9 @@
from masu.util.common import populate_enabled_tag_rows_with_limit
from masu.util.common import safe_float
from masu.util.common import strip_characters_from_column_name
from reporting.provider.azure.models import TRINO_COLUMNS
from reporting.provider.azure.models import TRINO_REQUIRED_COLUMNS

LOG = logging.getLogger(__name__)


def azure_json_converter(tag_str):
Expand Down Expand Up @@ -101,11 +104,9 @@ def process_dataframe(self, data_frame):

data_frame = data_frame.rename(columns=column_name_map)

columns = set(data_frame)
columns = set(TRINO_COLUMNS).union(columns)
columns = sorted(columns)

data_frame = data_frame.reindex(columns=columns)
missing = set(TRINO_REQUIRED_COLUMNS).difference(data_frame)
to_add = {k: TRINO_REQUIRED_COLUMNS[k] for k in missing}
data_frame = data_frame.assign(**to_add)

unique_tags = set()
for tags_json in data_frame["tags"].values:
Expand Down
8 changes: 4 additions & 4 deletions koku/masu/util/azure/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,17 +181,17 @@ def match_openshift_resources_and_labels(data_frame, cluster_topologies, matched
matchable_resources = list(nodes) + list(volumes)
data_frame["resource_id_matched"] = False
resource_id_df = data_frame["resourceid"]
if resource_id_df.isna().values.all():
if resource_id_df.eq("").all():
resource_id_df = data_frame["instanceid"]

if not resource_id_df.isna().values.all():
if not resource_id_df.eq("").all():
LOG.info("Matching OpenShift on Azure by resource ID.")
resource_id_matched = resource_id_df.str.contains("|".join(matchable_resources))
data_frame["resource_id_matched"] = resource_id_matched

data_frame["special_case_tag_matched"] = False
tags = data_frame["tags"]
if not tags.isna().values.all():
if not tags.eq("").all():
tags = tags.str.lower()
LOG.info("Matching OpenShift on Azure by tags.")
special_case_tag_matched = tags.str.contains(
Expand All @@ -207,7 +207,7 @@ def match_openshift_resources_and_labels(data_frame, cluster_topologies, matched
tag_values.extend(list(tag.values()))

any_tag_matched = None
if not tags.isna().values.all():
if not tags.eq("").all():
tag_matched = tags.str.contains("|".join(tag_keys)) & tags.str.contains("|".join(tag_values))
data_frame["tag_matched"] = tag_matched
any_tag_matched = tag_matched.any()
Expand Down
2 changes: 1 addition & 1 deletion koku/masu/util/ocp/ocp_post_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def _generate_daily_data(self, data_frame):
new_cols = report.get("new_required_columns")
for col in new_cols:
if col not in daily_data_frame:
daily_data_frame[col] = None
daily_data_frame[col] = pd.Series(dtype=pd.StringDtype(storage="pyarrow"))

return daily_data_frame

Expand Down
Loading

0 comments on commit da6a3e4

Please sign in to comment.