In [16]:
import os
import re
from pathlib import Path
from rdflib import Graph

import numpy as np
import pandas as pd

import laspy

import pvlib
from datetime import datetime
import cv2
import json

import math
from copy import deepcopy

import trimesh
import plotly.graph_objects as go

from sklearn.decomposition import PCA
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler
import open3d as o3d

from topologicpy.Vertex import Vertex
from topologicpy.Face import Face
from topologicpy.Cell import Cell
from matplotlib import cm

import geomapi.tools as tl
from geomapi.nodes import PointCloudNode
from geomapi.utils import geometryutils as gmu

from scipy.spatial import Delaunay
from scipy.spatial import ConvexHull

from geopy.geocoders import Nominatim

import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap

import utils
from utils import utils as ut


In [4]:
current_directory=Path(os.getcwd()) 
print(f'Current folder {current_directory}')

name = 'Theatre'

path = Path(os.getcwd()).parents[0] / 'data'
pcd_input_path = path
file_name = pcd_input_path / f'{name}.las'
print(f'Pcd_input_path: {pcd_input_path}\nFile name {file_name}')

class_file = Path(os.getcwd()) / '_classes_expanded.json'
print('class_file:', class_file)

if not class_file.exists():
    print(f"class file not found: {class_file}")
else:
    print(f"class file exists: {class_file}")

Current folder c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\script
Pcd_input_path: c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\data
File name c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\data\Theatre.las
class_file: c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\script\_classes_expanded.json
class file not found: c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\script\_classes_expanded.json


In [5]:
input_folder_ttl = path / 'graph_files'
graph_path = input_folder_ttl / f'{name}_graph.ttl'
graph_path_uri = graph_path.as_uri()

if graph_path.exists():
    print("Graph .ttl file uploaded!")
else:
    print("File not found!")

graph = Graph()
graph.parse(graph_path_uri, format = "turtle")
print("Graph parsed successfully")
nodes = tl.graph_to_nodes(graph)


Graph .ttl file uploaded!
Graph parsed successfully


In [6]:
mash_path = r"C:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\model\Theatre_volume.obj"
mesh = trimesh.load_mesh(mash_path)

model_path = Path(os.getcwd()).parents[0] / 'model' 

walls_obj_path = model_path/'Walls.obj' 
windows_obj_path =  model_path/'Windows.obj'
ceiling_obj_path =  model_path/'Ceilings.obj'

areas = ut.check_files(walls_obj_path, windows_obj_path, ceiling_obj_path) 

walls_mesh = trimesh.load(walls_obj_path)
walls_area = walls_mesh.area  # m²
windows_mesh = trimesh.load(windows_obj_path)
windows_area = windows_mesh.area  # m²
#ceiling_mesh = trimesh.load(ceiling_obj_path)


File exists: c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\model\Walls.obj
File exists: c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\model\Windows.obj
File not found: c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\model\Ceilings.obj
Processing walls...
Processing windows...


In [7]:
vertices_file = path / 'vertices.txt'

vertices = ut.read_vertices(vertices_file) 
vertices *= 1e-3 
print(f"Loaded {len(vertices)} vertices.")

hull = ConvexHull(vertices)
faces = hull.simplices.tolist()  

volume = ut.compute_volume(vertices, faces)
print(f"Computed volume: {volume:.2f} m³")

plot = True
plot2 = False

if plot:
    ut.plot_solid(vertices, faces)

if plot2:

    if mesh.vertices.shape[0] > 0 and mesh.faces.shape[0] > 0:
        print("Mesh loaded: Valid")
    else:
        print("Mesh loaded: Invalid")

    print("Vertices:")
    print(mesh.vertices)  
    print("\nFaces:")
    print(mesh.faces) 

    mesh.vertices *= 1e-3  # Scale all the coordinates by 10^-3

    # Ensure that faces are properly formatted for Plotly
    faces = mesh.faces

    # Extract the X, Y, Z coordinates from the vertices
    x, y, z = mesh.vertices.T  # Transpose to get X, Y, Z in separate arrays

    # Create a Plotly mesh trace for the mesh
    mesh_trace = go.Mesh3d(
        x=x, y=y, z=z,
        i=faces[:, 0],  # Indices of the first vertex of each triangle
        j=faces[:, 1],  # Indices of the second vertex of each triangle
        k=faces[:, 2],  # Indices of the third vertex of each triangle
        color='red',  # Set mesh color to red
        opacity=0.5,  # Optional: Set opacity to make it more transparent
        name='Mesh'
    )

    # Read the .las point cloud file
    name = 'Theatre'
    path = Path(os.getcwd()).parents[0] / 'data'
    pcd_input_path = path / f'{name}'
    file_name = pcd_input_path / f'{name}.las'

    # Initialize an empty list for traces
    traces = [mesh_trace]  # Start with the mesh trace

    # Read LAS file data if it exists
    if file_name.exists():
        las_data = laspy.read(file_name)
        # Get the XYZ coordinates from the LAS file
        x_points = las_data.x
        y_points = las_data.y
        z_points = las_data.z

        # Check how many points are in the point cloud
        print(f"Number of points in point cloud: {len(x_points)}")

        # Optional: Scale point cloud data by 10^-3 if it's on a different scale
        x_points *= 1e-3
        y_points *= 1e-3
        z_points *= 1e-3

        # Create a Plotly scatter trace for the point cloud with larger markers
        point_cloud_trace = go.Scatter3d(
            x=x_points, y=y_points, z=z_points,
            mode='markers',
            marker=dict(size=5, color='blue', opacity=0.7),  # Increase size and set opacity
            name='Point Cloud'
        )

        # Add point cloud trace to the traces list
        traces.append(point_cloud_trace)
    else:
        print(f"LAS file not found at {file_name}")

    # Create a layout for the Plotly figure with adjusted camera view, smaller font, and larger plot
    layout = go.Layout(
        scene=dict(
            xaxis=dict(
                title='x',
                titlefont=dict(size=8),  # Smaller font for axis title
                tickfont=dict(size=8)  # Smaller font for tick labels
            ),
            yaxis=dict(
                title='y',
                titlefont=dict(size=8),  # Smaller font for axis title
                tickfont=dict(size=8)  # Smaller font for tick labels
            ),
            zaxis=dict(
                title='z',
                titlefont=dict(size=8),  # Smaller font for axis title
                tickfont=dict(size=8)  # Smaller font for tick labels
            ),
            # Optional: Adjust the camera view to zoom out and fit both mesh and point cloud
            camera=dict(
                eye=dict(x=1.5, y=1.5, z=1.5)
            ),
        ),
        title="TOPOLOGIC BIM MODEL (TBIM)",
        titlefont=dict(size=12),  # Smaller font for the main title
        width=1000,  # Increase the width of the plot
        height=800,  # Increase the height of the plot
    )

    # Create the Plotly figure with all traces
    fig = go.Figure(data=traces, layout=layout)

    # Show the plot
    fig.show()


Loaded 14 vertices.
Computed volume: 267.42 m³


WALLS AXIS

In [9]:
laz_file = Path(path / 'pcd' / 'Theatre.las')
laz = ut.load_point_cloud(laz_file)
print(f'Las file {laz}')


Las file <LasData(1.2, point fmt: <PointFormat(2, 2 bytes of extra dims)>, 25023070 points, 1 vlrs)>


In [10]:
wall_nodes, walls_object_ids = ut.load_walls_graph(graph_path)

6 walls_nodes detected!


In [11]:
walls = ut.parse_walls(graph)

print("Parsed Walls Data:")
for wall in walls:
    print(wall)


Parsed Walls Data:
{'uri': 'file:///Theatre_Walls_0', 'object_id': rdflib.term.Literal('0', datatype=rdflib.term.URIRef('http://www.w3.org/2001/XMLSchema#integer')), 'oriented_bounds': rdflib.term.Literal('[[111.25671808  16.45092469   5.0433532 ]\n [145.12126584  15.99443705   5.16276755]\n [111.2228883   16.30130185  14.06512657]\n [111.21665322  13.46572111   4.99369445]\n [145.04737119  12.85961062  14.13488216]\n [111.18282344  13.31609826  14.01546781]\n [145.08120097  13.00923347   5.11310879]\n [145.08743606  15.84481421  14.18454092]]')}
{'uri': 'file:///Theatre_Walls_1', 'object_id': rdflib.term.Literal('1', datatype=rdflib.term.URIRef('http://www.w3.org/2001/XMLSchema#integer')), 'oriented_bounds': rdflib.term.Literal('[[118.19371065  27.60882854  15.48904597]\n [117.26713131  15.10822779  13.59766649]\n [118.27005487  28.87272553   7.09823349]\n [121.03425707  27.39914767  15.48330687]\n [120.18402195  16.16244391   5.2011149 ]\n [121.11060129  28.66304466   7.09249439]\n [

In [12]:
walls_data = [
    ((111.37, 29.21), (145.16, 29.04)),  # Wall 1
    ((119.69, 28.54), (119.52, 15.87)),  # Wall 2
    ((111.86, 14.67), (144.93, 14.40)),  # Wall 3
    ((138.19, 28.24), (138.41, 15.87)),  # Wall 4
]

processed_walls = ut.process_walls(walls_data)

print("Processed Wall Information:")
for wall in processed_walls:
    print(wall)


Processed Wall Information:
{'wall_id': 1, 'start_point': (111.37, 29.21), 'end_point': (145.16, 29.04), 'length': 33.790427638607945, 'orientation': -0.2882568907193976}
{'wall_id': 2, 'start_point': (119.69, 28.54), 'end_point': (119.52, 15.87), 'length': 12.671140438018986, 'orientation': -90.76872123649262}
{'wall_id': 3, 'start_point': (111.86, 14.67), 'end_point': (144.93, 14.4), 'length': 33.07110218907136, 'orientation': -0.4677809720853918}
{'wall_id': 4, 'start_point': (138.19, 28.24), 'end_point': (138.41, 15.87), 'length': 12.371956191322372, 'orientation': -88.9811040638947}


CAMERA POSITION

In [13]:
walls = [
    ((111.37, 29.21), (145.16, 29.04)),  # Wall 1
    ((119.69, 28.54), (119.52, 15.87)),  # Wall 2
    ((111.86, 14.67), (144.93, 14.40)),  # Wall 3
    ((138.19, 28.24), (138.41, 15.87)),  # Wall 4
]

point_cloud = ut.load_point_cloud_(path / 'pcd' / 'Theatre_5mm.las')

ut.get_wall_elevation(walls, point_cloud)


Ignoring fixed y limits to fulfill fixed data aspect with adjustable data limits.
Ignoring fixed x limits to fulfill fixed data aspect with adjustable data limits.


Saved elevation view for wall 1 to wall_elevations\wall_1_elevation.png


Ignoring fixed y limits to fulfill fixed data aspect with adjustable data limits.


Saved elevation view for wall 2 to wall_elevations\wall_2_elevation.png


Ignoring fixed x limits to fulfill fixed data aspect with adjustable data limits.


Saved elevation view for wall 3 to wall_elevations\wall_3_elevation.png
Saved elevation view for wall 4 to wall_elevations\wall_4_elevation.png


In [14]:
walls = [
    ((111.37, 29.21), (145.16, 29.04)),  # Wall 1
    ((119.69, 28.54), (119.52, 15.87)),  # Wall 2
    ((111.86, 14.67), (144.93, 14.40)),  # Wall 3
    ((138.19, 28.24), (138.41, 15.87)),  # Wall 4
]

point_cloud, objects = ut.load_point_cloud_for_geometry(path / 'pcd' / 'Theatre_5mm.las')

ut.get_wall_elevation_(walls, point_cloud, objects)


Ignoring fixed y limits to fulfill fixed data aspect with adjustable data limits.
Ignoring fixed x limits to fulfill fixed data aspect with adjustable data limits.


Saved elevation view for wall 1 to wall_elevations\wall_1_elevation_ids.png


Ignoring fixed y limits to fulfill fixed data aspect with adjustable data limits.


Saved elevation view for wall 2 to wall_elevations\wall_2_elevation_ids.png


Ignoring fixed x limits to fulfill fixed data aspect with adjustable data limits.


Saved elevation view for wall 3 to wall_elevations\wall_3_elevation_ids.png
Saved elevation view for wall 4 to wall_elevations\wall_4_elevation_ids.png


In [None]:
current_directory = Path.cwd()
wall_elevation_images = Path(current_directory / "wall_elevations" / "4edges")
edge_images = Path(current_directory / "edge_images")
edge_images.mkdir(parents=True, exist_ok=True) 

canny_dbscan = True

if canny_dbscan:

    def apply_canny_and_dbscan(image_path, output_path, eps=5, min_samples=10):
        """
        Apply Canny edge detection followed by DBSCAN clustering to segment edges and save labeled pixels in JSON.

        Args:
            image_path (Path): Path to the input image.
            output_path (Path): Path to save the clustered edges.
            eps (float): Maximum distance between points to be considered a cluster (DBSCAN).
            min_samples (int): Minimum number of points required to form a cluster (DBSCAN).
        """
        try:
            # Load the image
            image = cv2.imread(str(image_path), cv2.IMREAD_UNCHANGED)
            if image is None:
                print(f"Error: Could not read image {image_path}")
                return

            # Convert to grayscale if the image is not already
            if len(image.shape) == 3 and image.shape[2] == 4:  # RGBA
                image = cv2.cvtColor(image, cv2.COLOR_BGRA2GRAY)
            elif len(image.shape) == 3:  # RGB
                image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)

            # Apply Gaussian blur to reduce noise
            blurred_image = cv2.GaussianBlur(image, (5, 5), 1)

            # Apply Canny edge detection
            edges = cv2.Canny(blurred_image, threshold1=50, threshold2=150)

            # Extract edge points (non-zero coordinates)
            edge_points = np.column_stack(np.where(edges > 0))  # (row, col) as (y, x)

            # Create blank label arrays for pixels
            total_pixels = np.column_stack(np.where(np.ones_like(image)))  # All pixels in the image
            pixel_labels = np.full(image.shape, "pixels_off", dtype=object)

            if len(edge_points) == 0:
                print(f"No edges detected in {image_path}")
                return

            # Apply DBSCAN clustering to edge points
            dbscan = DBSCAN(eps=eps, min_samples=min_samples, metric="euclidean")
            labels = dbscan.fit_predict(edge_points)

            # Get unique cluster labels (exclude noise, labeled as -1)
            unique_labels = np.unique(labels)
            unique_labels = unique_labels[unique_labels >= 0]

            # Assign "pixels_on" to pixels belonging to clusters
            pixels_on_list = []
            for cluster_label in unique_labels:
                cluster_mask = (labels == cluster_label)
                cluster_points = edge_points[cluster_mask]
                pixels_on_list.extend(cluster_points.tolist())
                for point in cluster_points:
                    pixel_labels[point[0], point[1]] = "pixels_on"

            # Calculate pixels_off as the difference between total pixels and pixels_on
            pixels_on_set = set(tuple(p) for p in pixels_on_list)
            pixels_off_list = [list(map(int, p)) for p in total_pixels if tuple(p) not in pixels_on_set]

            # Convert `pixels_on` to Python-native types
            pixels_on_list = [list(map(int, p)) for p in pixels_on_list]

            # Save the pixel labels as a JSON file
            json_data = {
                "pixels_on": pixels_on_list,
                "pixels_off": pixels_off_list,
            }
            json_path = output_path / f"{image_path.stem}_labeled_pixels.json"
            with open(json_path, "w") as json_file:
                json.dump(json_data, json_file, indent=4)

            # Visualize the results
            plt.figure(figsize=(12, 6))
            plt.subplot(1, 2, 1)
            plt.title("Original Image")
            plt.imshow(image, cmap="gray")
            plt.axis("off")

            plt.subplot(1, 2, 2)
            plt.title("Canny Edges with Clustering")
            clustered_visualization = np.where(pixel_labels == "pixels_on", 255, 0).astype(np.uint8)
            plt.imshow(clustered_visualization, cmap="gray")
            plt.axis("off")

            plt.tight_layout()
            plt.show()

        except Exception as e:
            print(f"An error occurred while processing {image_path}: {e}")

    for image_file in wall_elevation_images.iterdir():
        if image_file.suffix.lower() in [".png", ".jpg", ".jpeg"]:  # Filter for image files
            apply_canny_and_dbscan(image_file, edge_images, eps = 150, min_samples = 10)



An error occurred while processing c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\script\wall_elevations\4edges\polygons_wall_1_elevation.png: name 'DBSCAN' is not defined
An error occurred while processing c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\script\wall_elevations\4edges\segmented_2_polygons_wall_1_elevation.png: name 'DBSCAN' is not defined
An error occurred while processing c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\script\wall_elevations\4edges\wall_1_elevation.png: name 'DBSCAN' is not defined
An error occurred while processing c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\script\wall_elevations\4edges\wall_1_elevation_ids.png: name 'DBSCAN' is not defined
An error occurred while processing c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\script\wall_elevations\4edges\wall_3_elevation_ids.png: name 'DBSCAN' is not defined


In [13]:
image_folder = current_directory / 'wall_elevations' / '4edges'
folder_path =  image_folder / 'viridis'  
output_folder = image_folder / 'results'
os.makedirs(output_folder, exist_ok=True)  # Create the output folder if it doesn't exist

# Define Viridis colormap
viridis = cm.get_cmap('viridis', 256)  # Compatible with older Matplotlib versions
viridis_colors = (viridis(np.linspace(0, 1, 256))[:, :3] * 255).astype(np.uint8)  # Convert to RGB

num_bins = 100  
bins = np.linspace(0, 256, num_bins + 1, dtype=int)

def process_viridis_image(image_path, output_folder, viridis_colors, bins):
    # Load the image
    image = cv2.imread(image_path)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Convert to RGB

    # Reshape the image into a 2D array of pixels [R, G, B]
    pixels = image.reshape(-1, 3)

    # Create masks for each bin
    masks = []
    for i in range(len(bins) - 1):
        lower_bound = viridis_colors[bins[i]]
        upper_bound = viridis_colors[bins[i + 1] - 1]

        # Threshold pixels to create a mask
        mask = ((pixels >= lower_bound) & (pixels <= upper_bound)).all(axis=1).reshape(image.shape[:2])
        mask = (mask * 255).astype(np.uint8)  # Convert boolean mask to uint8
        masks.append(mask)

    # Save masks
    base_filename = os.path.splitext(os.path.basename(image_path))[0]
    for i, mask in enumerate(masks):
        mask_path = os.path.join(output_folder, f"{base_filename}_mask_bin_{i}.png")
        cv2.imwrite(mask_path, mask)

    print(f"Processed {image_path}, masks saved in {output_folder}")

for filename in os.listdir(folder_path):
    file_path = os.path.join(folder_path, filename)
    if os.path.isfile(file_path) and filename.lower().endswith(('.png', '.jpg', '.jpeg')):
        process_viridis_image(file_path, output_folder, viridis_colors, bins)
        



The get_cmap function was deprecated in Matplotlib 3.7 and will be removed in 3.11. Use ``matplotlib.colormaps[name]`` or ``matplotlib.colormaps.get_cmap()`` or ``pyplot.get_cmap()`` instead.



Processed c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\script\wall_elevations\4edges\viridis\wall_1_elevation.png, masks saved in c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\script\wall_elevations\4edges\results
Processed c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\script\wall_elevations\4edges\viridis\wall_2_elevation.png, masks saved in c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\script\wall_elevations\4edges\results
Processed c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\script\wall_elevations\4edges\viridis\wall_3_elevation.png, masks saved in c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\script\wall_elevations\4edges\results
Processed c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\script\wall_elevations\4edges\viridis\wall_4_elevation.png, masks saved in c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\senso

ORIENTATION OF BUILDING

In [14]:
downsampling_factor = 1000

In [15]:
theatre_file = r"C:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\data\pcd\Theatre.las"
points, points_rotated, bbox_rotated, rotation_matrix = ut.compute_oriented_bounding_box(theatre_file)
decimated_points = points[::downsampling_factor]
decimated_rotated_points = points_rotated[::downsampling_factor]
rotation_matrix_inverse = np.linalg.inv(rotation_matrix)
center = np.mean(points, axis = 0)

points_aligned_with_axes = ut.rotate_points_3d(decimated_rotated_points, center, rotation_matrix_inverse)

# print(f'Bounding Box {bbox_rotated}\nRotation Matrix: {rotation_matrix}')
# print(f'Bounding Box rotated {bbox_rotated} lenght {len(bbox_rotated)} Type{type(bbox_rotated)}')


WHEATER DATA

In [16]:
weather_data = path / 'weather_data' / f'{name}.epw'

if weather_data.exists():
    print("Weather data file uploaded!")
else:
    print("File not found")
    

Weather data file uploaded!


In [17]:
points, points_rotated, bbox_rotated, rotation_matrix = ut.compute_oriented_bounding_box(theatre_file)
decimated_points = points[::downsampling_factor]
decimated_rotated_points = points_rotated[::downsampling_factor]
rotation_matrix_inverse = np.linalg.inv(rotation_matrix)
center = np.mean(points, axis = 0)

points_aligned_with_axes = ut.rotate_points_3d(decimated_rotated_points, center, rotation_matrix_inverse)

print(f'Bounding Box {bbox_rotated}\nRotation Matrix: {rotation_matrix}')
print(f'Bounding Box rotated {bbox_rotated} lenght {len(bbox_rotated)} Type{type(bbox_rotated)}')

beta_angle = ut.extract_beta_angle_from_rotation_matrix(rotation_matrix) 

rotation_matrix_building = ut.rotation_matrix_z(beta_angle)
center_of_bbox = np.mean(decimated_points, axis=0)  # Assuming you rotate around the center of the building
rotated_bbox = ut.rotate_points_3d(bbox_rotated, center_of_bbox, rotation_matrix_building) 

print (f"Exposition angle of buildings N/S: {round(beta_angle, 3)}")
print(f"Building's Rotated Bounding Box (after applying β angle rotation): {rotated_bbox[:5]} Len {len(rotated_bbox)}") 
print(f'Rotation Matrix used for β angle rotation: {rotation_matrix_building}')


Bounding Box [[-19.18031045 -10.13297167  -6.0088717 ]
 [ 16.34764637 -10.13297167  -6.0088717 ]
 [ 16.34764637   8.86224224  -6.0088717 ]
 [-19.18031045   8.86224224  -6.0088717 ]
 [-19.18031045 -10.13297167   6.88268739]
 [ 16.34764637 -10.13297167   6.88268739]
 [ 16.34764637   8.86224224   6.88268739]
 [-19.18031045   8.86224224   6.88268739]]
Rotation Matrix: [[ 0.99677407  0.07374753 -0.03166633]
 [-0.07468796  0.996767   -0.02961863]
 [ 0.02937966  0.03188817  0.99905955]]
Bounding Box rotated [[-19.18031045 -10.13297167  -6.0088717 ]
 [ 16.34764637 -10.13297167  -6.0088717 ]
 [ 16.34764637   8.86224224  -6.0088717 ]
 [-19.18031045   8.86224224  -6.0088717 ]
 [-19.18031045 -10.13297167   6.88268739]
 [ 16.34764637 -10.13297167   6.88268739]
 [ 16.34764637   8.86224224   6.88268739]
 [-19.18031045   8.86224224   6.88268739]] lenght 8 Type<class 'numpy.ndarray'>
Exposition angle of buildings N/S: -4.285
Building's Rotated Bounding Box (after applying β angle rotation): [[-16.33785

BOUNDARY CONDITIONS

In [18]:
location = ut.extract_location_from_epw(weather_data)
summer_winter_temps = ut.extract_seasonal_temperatures(weather_data)

summer_max_temp = float(summer_winter_temps['summer_max_temp'])
summer_min_temp = float(summer_winter_temps['summer_min_temp'])
winter_max_temp = float(summer_winter_temps['winter_max_temp'])
winter_min_temp = float(summer_winter_temps['winter_min_temp'])

print(f"Location: {location}")
print(f"Summer Max Temperature: {summer_winter_temps['summer_max_temp']}\nSummer Min Temperature: {summer_winter_temps['summer_min_temp']}")
print(f"Winter Max Temperature: {summer_winter_temps['winter_max_temp']}\nWinter Min Temperature: {summer_winter_temps['winter_min_temp']}")


Location: {'LocationName': 'Trento', 'Latitude': 46.1029536, 'Longitude': 11.12974249416882}
Summer Max Temperature: 30.8
Summer Min Temperature: 9.8
Winter Max Temperature: 13.0
Winter Min Temperature: -7.8


Average hourly temperature for months


In [19]:
hourly_data = ut.extract_and_clean_hourly_data(weather_data)
monthly_avg_temps = ut.compute_average_hourly_temperatures(hourly_data)
ut.display_monthly_data(monthly_avg_temps)



Month: 1
Hour   1    2    3    4    5    6    7    8    9    10  ...   15   16   17  \
Day                                                     ...                  
1     2.5  2.6  1.5  1.0  1.2  1.0  0.9  0.8  0.7 -2.7  ...  4.7  5.7  4.7   
2     2.6  1.8  1.4  1.6  2.3  2.2  1.8  0.4  0.5  0.6  ...  0.6  0.4  7.6   
3     5.1  4.6  4.3  4.0  4.0  3.5  3.3  2.8  2.0  3.8  ...  8.0  7.7  6.9   
4     3.2  3.3  2.5 -0.7 -1.8 -2.8 -3.1 -3.4 -4.2 -4.2  ...  0.9  1.4  2.0   
5     1.5 -2.8 -2.9 -4.0 -4.3 -4.8 -4.5 -4.8 -5.0 -4.3  ...  4.1  4.6  4.4   
6     8.4  0.2  6.8  5.2  2.3  1.4  0.9 -0.4 -0.5 -1.0  ...  5.7  6.3  5.4   
7     0.8  0.3  1.7  0.3 -0.6 -1.5 -1.2 -1.7 -1.2 -1.9  ...  5.5  5.1  4.9   
8     0.6  0.2 -1.4 -1.9 -2.6 -2.1 -2.1 -1.6 -1.2 -1.4  ...  3.2  3.0  2.9   
9     2.2  2.3  1.8  1.7  0.7 -3.9 -3.7  0.4 -2.1 -2.1  ...  0.4  0.5  7.9   
10    4.8  4.3  4.9  3.7  2.0  1.3  0.6  1.4  3.6  4.0  ...  0.1  0.2  0.3   
11    4.0  3.8  3.8  3.5  3.3  2.6  2.5  2.5  2.5  2.1

GEOMETRY BOUNDARY CONDITIONS

In [20]:
setpoint_summer = 24
setpoint_winter = 20

occupancies = [50, 80, 100, 120, 250] 
max_occupancy = 250

avg_wall = 0.62
u_corr = 0.50

In [21]:
wall_info = True

if wall_info:

    # Thermal conductivities (W/m·K)
    lambda1 = 2.3  # Layer 1
    lambda2 = 0.9  # Layer 2
    lambda3 = 1.3  # Layer 3
    lambda_avg = (lambda1 + lambda2 + lambda3) / 3  # Average thermal conductivity

    # Wall thicknesses (m)
    d1_wall = 0.45
    d2_wall = 0.425
    d3_wall = 0.465
    d4_wall = 0.475

    # Wall U-values
    u1_walls = lambda_avg / d1_wall
    u2_walls = lambda_avg / d2_wall
    u3_walls = lambda_avg / d3_wall
    u4_walls = lambda_avg / d4_wall

    u_walls = [u1_walls, u2_walls, u3_walls, u4_walls]
    total_u_value_walls = np.mean(u_walls)

    # Thermal conductivities of window layers (W/m·K)
    lambda1_w = 2.3   # Layer 1 (e.g., inner glass layer)
    lambda2_w = 2.89  # Layer 2 (e.g., middle air gap)
    lambda3_w = 2.93  # Layer 3 (e.g., outer glass layer)

    # Thicknesses of window layers (m)
    d1_window = 0.02   # Thickness of Layer 1
    d2_window = 0.015  # Thickness of Layer 2
    d3_window = 0.025  # Thickness of Layer 3

    # Air film resistances (m²·K/W) - default values
    R_air_inside = 0.13  # Inside air film resistance
    R_air_outside = 0.13  # Outside air film resistance

    # Calculate thermal resistances of individual layers
    R1_window = d1_window / lambda1_w  # Resistance of Layer 1
    R2_window = d2_window / lambda2_w  # Resistance of Layer 2
    R3_window = d3_window / lambda3_w  # Resistance of Layer 3

    # Total thermal resistance
    R_total = R_air_inside + R_air_outside + R1_window + R2_window + R3_window

    # U-value calculation (W/m²·K)
    total_u_value_windows = 1 / R_total

    # Print U-values
    print("U-Values:")
    print(f" - Walls: {total_u_value_walls:.2f} W/m²·K")
    print(f" - Windows: {total_u_value_windows:.2f} W/m²·K")

    # Print U-values
    print("U-Values validated:")
    print(f" - Walls: {total_u_value_walls - u_corr:2f} W/m²·K")
    print(f" - Windows: {total_u_value_windows - u_corr:2f} W/m²·K")

    total_u_value_walls = total_u_value_walls - u_corr
    total_u_value_windows = total_u_value_windows - u_corr


U-Values:
 - Walls: 3.31 W/m²·K
 - Windows: 3.54 W/m²·K
U-Values validated:
 - Walls: 2.811612 W/m²·K
 - Windows: 3.040846 W/m²·K


In [22]:
lw1 = 33.86
lw2 = 12.53
lw3 = 12.23
lw4 = 34.15
lw = [lw1, lw2, lw3, lw4]
l_walls = np.sum(lw)

h1 = 9.02
h2 = 8.39
h3 = 10.34
h4 = 9.07
hw = [h1, h2, h3, h4]
h_avg = np.sum(hw)/len(hw)

wall_area = (l_walls * h_avg) * avg_wall 
wall_area_ = ((12.40 * 2) + (17.60 *2)) * 7.46
print(f'Area walls {wall_area}, Area walls correct {wall_area_}')
window_area = 12 * (1.20 * 2.10) # m² (approximate total window area)
print(f'Area windows {window_area:2f}')


Area walls 529.447667, Area walls correct 447.6
Area windows 30.240000


In [23]:
area_walls = wall_area_
area_windows = window_area

delta_temp_winter = setpoint_winter - winter_min_temp
delta_temp_summer = summer_max_temp - setpoint_summer

# Winter
Q_walls_winter = total_u_value_walls * area_walls * delta_temp_winter  # Heat loss through walls
Q_windows_winter = total_u_value_windows * area_windows * delta_temp_winter  # Heat loss through windows

Q_total_winter = Q_walls_winter + Q_windows_winter

# Summer
Q_walls_summer = total_u_value_walls * area_walls * delta_temp_summer  # Heat loss through walls
Q_windows_summer = total_u_value_windows * area_windows * delta_temp_summer  # Heat loss through windows

Q_total_summer = Q_walls_summer + Q_windows_summer

print(f"Heat loss through walls during winter: {Q_walls_winter:.2f} W")
print(f"Heat loss through windows during winter: {Q_windows_winter:.2f} W")
print(f"--> Total heat loss during winter: {Q_total_winter:.2f} W")
print('------------------')
print(f"Heat gained through walls during winter: {Q_walls_summer:.2f} W")
print(f"Heat gained through windows during winter: {Q_windows_summer:.2f} W")
print(f"--> Total heat gained during winter: {Q_total_summer:.2f} W")


Heat loss through walls during winter: 34985.67 W
Heat loss through windows during winter: 2556.35 W
--> Total heat loss during winter: 37542.02 W
------------------
Heat gained through walls during winter: 8557.65 W
Heat gained through windows during winter: 625.30 W
--> Total heat gained during winter: 9182.94 W


HEAT GAINED / LOST

In [24]:
air_density = 1.225  # kg/m³ (standard air density)
air_specific_heat = 1005  # J/kg·K (specific heat capacity of air)
ventilation_delta_t = 5  # Example temperature difference (supply - indoor) in °C
occupancies = [20, 50, 80, 100, 120, 150, 175, 200, 250]  # Occupancy levels

# Calculate ventilation airflow rate for each occupancy
ventilation_airflow_rate = [0.01 * occupancy for occupancy in occupancies]  # 10 L/s per occupant

# Calculate mass flow rate and ventilation intensity for each occupancy
ventilation_intensities = [
    airflow_rate * air_density * air_specific_heat * ventilation_delta_t
    for airflow_rate in ventilation_airflow_rate
]

# Display results
for occupancy, intensity in zip(occupancies, ventilation_intensities):
    print(f"Occupancy: {occupancy}, Ventilation Intensity: {intensity:.2f} W")

# Ventilation spread (example value)
ventilation_spread = 2  # meters (Gaussian airflow spread)
radiator_intensity = 1200  # in W
radiator_spread = 5  # meters


Occupancy: 20, Ventilation Intensity: 1231.12 W
Occupancy: 50, Ventilation Intensity: 3077.81 W
Occupancy: 80, Ventilation Intensity: 4924.50 W
Occupancy: 100, Ventilation Intensity: 6155.62 W
Occupancy: 120, Ventilation Intensity: 7386.75 W
Occupancy: 150, Ventilation Intensity: 9233.44 W
Occupancy: 175, Ventilation Intensity: 10772.34 W
Occupancy: 200, Ventilation Intensity: 12311.25 W
Occupancy: 250, Ventilation Intensity: 15389.06 W


In [25]:
vertices = ut.read_vertices(vertices_file)
vertices *= 1e-3  # Scale to meters

radiator_positions = [
    np.array([116.82, 14.47, 7.50]),
    np.array([129.37, 15.03, 7.50]),
    np.array([121.02, 14.76, 7.50]),
    np.array([139.16, 15.03, 7.50]),

    np.array([139.30, 28.20, 7.50]),
    np.array([132.75, 28.43, 7.50]),
    np.array([124.75, 28.66, 7.50]),
    np.array([120.90, 28.69, 7.50])
]

num_radiators = len(radiator_positions)
print(f'Number of radiators: {num_radiators}')

num_ventilators = len(radiator_positions)
ventilator_positions = ut.distribute_ventilators_on_ceiling(vertices, num_ventilators)

print("Ventilator Positions (x, y, z):")
print(ventilator_positions)


Number of radiators: 8
Ventilator Positions (x, y, z):
[[111.1080837  21.6753268  16.5971623]
 [111.1080837  22.0786419  16.5971623]
 [136.0935059  21.6753268  16.5971623]
 [136.0935059  22.0786419  16.5971623]]


In [26]:
base_conditions = {
    "location": "Sample Location",
    "indoor_temp": None,  # This will be updated for each season (e.g., setpoint_summer or setpoint_winter)
    "walls": {
        "type": "no-slip", 
        "u_value": None,  # This will be updated to `total_u_value_walls` dynamically
        "temperature": None,  # Wall temperature
    },
    "windows": {
        "type": "heat_flux", 
        "u_value": 3.040846,  # Windows U-value in W/m²·K
        "value": -50,  # Cooling effect in W/m²
    },
    "air": {
        "density": 1.225,  # Air density in kg/m³
        "specific_heat_capacity": 1005,  # Air specific heat capacity in J/kg·K
        "temperature": None,  # Will be updated dynamically
        "humidity": None,  # Will be updated dynamically
    },
    "occupants": {
        "heat_source": 85,  # Heat per person in W
    }
}

seasons = {
    "Winter": {"indoor_temp": setpoint_winter, "wall_temperature": winter_min_temp + setpoint_winter/2, "u_value": total_u_value_walls},
    "Summer": {"indoor_temp": setpoint_summer, "wall_temperature": summer_max_temp - setpoint_summer/2, "u_value": total_u_value_walls},
}

for season, data in seasons.items():
    print(f"\n{season} Conditions:")
    base_conditions["indoor_temp"] = data["indoor_temp"]
    base_conditions["walls"]["u_value"] = data["u_value"]
    base_conditions["walls"]["temperature"] = data["wall_temperature"]
    print(base_conditions)

def create_scenarios(seasons, occupancies, base_conditions):
    """
    Generate scenarios for different seasons and occupancies with all relevant details for CFD analysis.
    """
    if not isinstance(seasons, dict):
        raise ValueError("Seasons must be a dictionary with season names and data.")
    if not all(isinstance(o, (int, float)) and o > 0 for o in occupancies):
        raise ValueError("Occupancies must be a list of positive numbers.")
    if "walls" not in base_conditions or "windows" not in base_conditions or "air" not in base_conditions:
        raise ValueError("Base conditions must define 'walls', 'windows', and 'air'.")

    scenarios = []

    for season, season_data in seasons.items():
        for occupancy in occupancies:
            scenario = deepcopy(base_conditions)
            scenario["season"] = season
            scenario["walls"]["temperature"] = season_data.get("wall_temperature")
            scenario["windows"]["temperature"] = season_data.get("outdoor_temp")
            scenario["air"]["temperature"] = season_data.get("outdoor_temp")
            scenario["air"]["humidity"] = season_data.get("humidity", 50)
            scenario["occupants"] = {
                "count": occupancy,
                "heat_source": occupancy * base_conditions["occupants"]["heat_source"],
                "CO2_generation_rate": 0.005 * occupancy,  # CO2 generation (L/s per person)
            }
            scenario["ventilation"] = {
                "airflow_rate": ventilation_airflow_rate[occupancies.index(occupancy)],
                "intensity": ventilation_intensities[occupancies.index(occupancy)],
                "spread": ventilation_spread,
            }
            scenarios.append(scenario)

    return scenarios

print("\nGenerated Scenarios:\n" + "-" * 50)
scenarios = create_scenarios(seasons, occupancies, base_conditions)

for i, scenario in enumerate(scenarios, start=1):
    print(f"\nScenario {i}: {scenario}")



Winter Conditions:
{'location': 'Sample Location', 'indoor_temp': 20, 'walls': {'type': 'no-slip', 'u_value': np.float64(2.8116115716235557), 'temperature': 2.2}, 'windows': {'type': 'heat_flux', 'u_value': 3.040846, 'value': -50}, 'air': {'density': 1.225, 'specific_heat_capacity': 1005, 'temperature': None, 'humidity': None}, 'occupants': {'heat_source': 85}}

Summer Conditions:
{'location': 'Sample Location', 'indoor_temp': 24, 'walls': {'type': 'no-slip', 'u_value': np.float64(2.8116115716235557), 'temperature': 18.8}, 'windows': {'type': 'heat_flux', 'u_value': 3.040846, 'value': -50}, 'air': {'density': 1.225, 'specific_heat_capacity': 1005, 'temperature': None, 'humidity': None}, 'occupants': {'heat_source': 85}}

Generated Scenarios:
--------------------------------------------------

Scenario 1: {'location': 'Sample Location', 'indoor_temp': 24, 'walls': {'type': 'no-slip', 'u_value': np.float64(2.8116115716235557), 'temperature': 2.2}, 'windows': {'type': 'heat_flux', 'u_val

A1_ IMPLEMENTATION

In [28]:
solar_info = True

if solar_info:

    epw_path = weather_data
    elevation = ut.get_elevation_from_epw(epw_path)
    print(f"Elevation: {elevation} meters")

    beta_angle = (180 - beta_angle)
    latitude = location['Latitude']
    longitude = location['Longitude']

    time = datetime.now()

    # Solar position calculation
    solar_position = pvlib.solarposition.get_solarposition(
        time, latitude, longitude, altitude=elevation
    )

    solar_zenith = solar_position['apparent_zenith'].iloc[0]
    solar_azimuth = solar_position['azimuth'].iloc[0]

    # Calculate relative airmass
    airmass_relative = pvlib.atmosphere.get_relative_airmass(solar_zenith)
    # Calculate atmospheric pressure from elevation
    pressure = pvlib.atmosphere.alt2pres(elevation)  # Atmospheric pressure in Pascals
    # Calculate absolute airmass
    airmass_absolute = pvlib.atmosphere.get_absolute_airmass(airmass_relative, pressure)
    # Assume Linke turbidity 
    linke_turbidity = 3  # Typical value for clear skies
    # Calculate clear-sky irradiance using the Ineichen model
    clearsky_model = pvlib.clearsky.ineichen(
        solar_position['apparent_zenith'],
        airmass_absolute,
        linke_turbidity,
        altitude=elevation
    )

    ghi = clearsky_model['ghi'].iloc[0]
    dni = clearsky_model['dni'].iloc[0]
    dhi = clearsky_model['dhi'].iloc[0]

    print(f"Solar Zenith: {solar_zenith}°")
    print(f"Solar Azimuth: {solar_azimuth}°")
    print(f"GHI: {ghi} W/m²") # GHI (Global Horizontal Irradiance):
    print(f"DNI: {dni} W/m²") # DNI (Direct Normal Irradiance):
    print(f"DHI: {dhi} W/m²") # DHI (Diffuse Horizontal Irradiance):


Elevation: 190.0 meters
Solar Zenith: 70.1721156020615°
Solar Azimuth: 151.09572809863892°
GHI: 284.4872461266306 W/m²
DNI: 677.132182944577 W/m²
DHI: 54.80686524632796 W/m²


In [29]:
time_hours = 8
shgc = 0.6  
air = {"u_value": 0.35}  

delta_temp_winter = setpoint_winter - winter_min_temp 
delta_temp_summer = summer_max_temp - setpoint_summer

wall_material = "medium"
if wall_material == "light":
    wall_shgc = 0.3
elif wall_material == "medium":
    wall_shgc = 0.5
else:
    wall_shgc = 0.7  

wall_shgc = 0.5


SEASON TEMPERATURES

In [30]:
with open(weather_data, 'r') as file:
    lines = file.readlines()

data_start_index = 0

for i, line in enumerate(lines):
    if line.startswith("DATA PERIODS"):
        data_start_index = i + 3
        break

column_names = [
    "Year", "Month", "Day", "Hour", "Minute", "Data Source", "Dry Bulb Temperature", 
    "Dew Point Temperature", "Relative Humidity", "Atmospheric Pressure", "Extraterrestrial Horizontal Radiation", 
    "Extraterrestrial Direct Normal Radiation", "Horizontal Infrared Radiation Intensity", 
    "Global Horizontal Radiation", "Direct Normal Radiation", "Diffuse Horizontal Radiation", 
    "Global Horizontal Illuminance", "Direct Normal Illuminance", "Diffuse Horizontal Illuminance", 
    "Zenith Luminance", "Wind Direction", "Wind Speed", "Total Sky Cover", "Opaque Sky Cover", 
    "Visibility", "Ceiling Height", "Present Weather Observation", "Present Weather Codes", 
    "Precipitable Water", "Aerosol Optical Depth", "Snow Depth", "Days Since Last Snowfall", 
    "Albedo", "Liquid Precipitation Depth", "Liquid Precipitation Quantity"
]

data = pd.read_csv(
    weather_data, 
    skiprows=data_start_index, 
    header=None, 
    names=column_names
)

metadata = data[["Year", "Month", "Day", "Hour", "Dry Bulb Temperature"]]

summer_months = [6, 7, 8]
winter_months = [1, 11, 12]
temperatures = data["Dry Bulb Temperature"]
print(temperatures)

metadata.loc[:, "Dry Bulb Temperature"] = metadata["Dry Bulb Temperature"]

summer_temperatures = metadata[metadata["Month"].isin(summer_months)]
winter_temperatures = metadata[metadata["Month"].isin(winter_months)]
print(len(summer_temperatures))

average_summer_temperature = summer_temperatures["Dry Bulb Temperature"].mean()
average_winter_temperature = winter_temperatures["Dry Bulb Temperature"].mean()
print(f"Average Summer Temperature (Celsius): {average_summer_temperature:.2f}")
print(f"Average Winter Temperature (Celsius): {average_winter_temperature:.2f}")


0       1.5
1       1.0
2       1.2
3       1.0
4       0.9
       ... 
8753    0.6
8754    1.8
8755    1.9
8756    1.9
8757    2.1
Name: Dry Bulb Temperature, Length: 8758, dtype: float64
2208
Average Summer Temperature (Celsius): 18.87
Average Winter Temperature (Celsius): 2.38


In [31]:
import pvlib
from datetime import datetime
from typing import Dict

# Constants
days_in_season = 90  
hours_in_day = 24

def wall_heat(walls_obj_path: str, ghi: float, air: Dict, wall_shgc: float, time_hours: int, setpoint: float, outdoor_temp: float, season: str):
    """
    Calculate heat gain/loss through walls for summer or winter.
    """
    try:
        walls_mesh = trimesh.load(walls_obj_path)
        walls_area = walls_mesh.area  # m²
    except Exception as e:
        raise ValueError(f"Error loading wall geometry: {e}")

    delta_temp = setpoint - outdoor_temp if season == 'winter' else outdoor_temp - setpoint

    # Conduction through walls
    conduction_heat = walls_area * delta_temp * air["u_value"]  # W

    # Solar radiance gain (only in summer or optional for winter with ghi)
    radiance_heat = ghi * walls_area * wall_shgc * time_hours if ghi and season == 'summer' else 0  # W

    # Total wall heat
    total_walls_heat = conduction_heat + radiance_heat
    average_daily_wall_heat = total_walls_heat / (hours_in_day * days_in_season)

    print(f"Total wall heat daily ({season}): {average_daily_wall_heat:.2f} W")
    return total_walls_heat

def windows_heat(windows_area: float, delta_temp: float, ghi: float, air: Dict, shgc: float, time_hours: int, season: str, occupancy: Dict, ventilation: Dict):
    """
    Calculate heat load through windows and consider occupancy and ventilation losses.
    """
    # Heat transfer through windows
    conduction_heat = windows_area * delta_temp * air["u_value"]  # W
    radiance_heat = ghi * windows_area * shgc * time_hours if season == 'summer' else 0  # W

    # Internal heat from occupants
    occupancy_gain = occupancy["count"] * occupancy["heat_source"]  # W

    # Ventilation loss
    ventilation_loss = (
        ventilation["airflow_rate"] * air["specific_heat_capacity"] * ventilation["delta_t"]
    )  # W

    # Total heat load for windows
    total_load = conduction_heat + radiance_heat + occupancy_gain - ventilation_loss
    average_daily_windows_heat = total_load / (days_in_season * hours_in_day)

    print(f"Total windows daily heat ({season}): {average_daily_windows_heat:.2f} W")
    return total_load

def calculate_ceiling_heat(ceiling_area: float, delta_temp: float, ghi: float, air: Dict, shgc: float, time_hours: int, season: str):
    """
    Calculate heat load through ceilings for summer or winter.
    """
    conduction_heat = ceiling_area * delta_temp * air["u_value"]  # W
    radiance_heat = ghi * ceiling_area * shgc * time_hours if season == 'summer' else 0  # W

    total_ceiling_heat = conduction_heat + radiance_heat
    average_daily_ceiling_heat = total_ceiling_heat / (days_in_season * hours_in_day)

    print(f"Ceiling heat load ({season}): {average_daily_ceiling_heat:.2f} W")
    return total_ceiling_heat

def calculate_heat_load(walls_obj_path: str, windows_obj_path: str, ceiling_area: float, air: Dict, occupancy: Dict, ventilation: Dict, ghi: float, shgc: float, time_hours: int, setpoint: float, outdoor_temp: float, season: str):
    """
    Calculate the total heat load for walls, windows, and ceilings for a given season.
    """
    try:
        walls_mesh = trimesh.load(walls_obj_path)
        walls_area = walls_mesh.area  # m²
        windows_mesh = trimesh.load(windows_obj_path)
        windows_area = windows_mesh.area  # m²
    except Exception as e:
        raise ValueError(f"Error loading geometry: {e}")

    delta_temp = setpoint - outdoor_temp if season == 'winter' else outdoor_temp - setpoint

    # Calculate heat for each component
    walls_heat = wall_heat(walls_obj_path, ghi, air, shgc, time_hours, setpoint, outdoor_temp, season)
    windows_heat_load = windows_heat(windows_area, delta_temp, ghi, air, shgc, time_hours, season, occupancy, ventilation)
    ceiling_heat_load = calculate_ceiling_heat(ceiling_area, delta_temp, ghi, air, shgc, time_hours, season)

    # Total heat load
    total_heat_load = walls_heat + windows_heat_load + ceiling_heat_load

    print(f"Total heat load ({season}): {total_heat_load:.2f} W")
    return total_heat_load


In [32]:
air = {"u_value": 0.5, "specific_heat_capacity": 1005, "temperature": 35, "indoor_temperature": 22}
occupancy = {"count": 4, "heat_source": 100}  # 100 W per person
ventilation = {"airflow_rate": 0.1}  # m³/s
ghi = 500  # W/m²
shgc = 0.5
time_hours = 8
ceiling_area = 50  # m²

specific_heat_capacity = 1005


In [33]:
from copy import deepcopy

def calculate_heat_load(walls, windows, air, occupancy, ventilation):
    """
    Calculate total heat load based on building parameters.
    """
    conduction = walls["u_value"] * walls["temperature"]
    window_loss = windows["u_value"] * windows["value"]
    occupancy_gain = occupancy["count"] * occupancy["heat_source"]
    ventilation_loss = ventilation["airflow_rate"] * air["specific_heat_capacity"] * (air["temperature"] - walls["temperature"])
    total_load = conduction + window_loss + occupancy_gain - ventilation_loss
    return total_load

def calculate_heat_summer(walls_obj_path, windows_obj_path, air, occupancy, ventilation, ghi, shgc, time_hours):
    # Load the mesh for walls and calculate the total surface area
    walls_mesh = trimesh.load(walls_obj_path)
    walls_area = walls_mesh.area  # Total surface area of walls in m²

    # Load the mesh for windows and calculate the total surface area
    windows_mesh = trimesh.load(windows_obj_path)
    windows_area = windows_mesh.area  # Total surface area of windows in m²

    walls_heat = walls_area * air["temperature"] * air["u_value"]  # Basic conduction formula

    windows_heat = (
        windows_area * air["temperature"] * air["u_value"] + ghi * windows_area * shgc * time_hours
    )
    total_windows_heat = windows_heat / 1000  # Convert from W to kW

    occupancy_gain = occupancy["count"] * occupancy["heat_source"]

    ventilation_loss = (
        ventilation["airflow_rate"] * air["specific_heat_capacity"] * (air["temperature"] - air["indoor_temperature"])
    )

    total_load = walls_heat + total_windows_heat + occupancy_gain - ventilation_loss

    return {
        "walls_heat": walls_heat / 1000,  # Convert to kW
        "windows_heat": total_windows_heat,
        "occupancy_gain": occupancy_gain,
        "ventilation_loss": ventilation_loss / 1000,  # Convert to kW
        "total_load": total_load / 1000,  # Convert to kW
    }

def create_scenarios(seasons, occupancies, base_conditions):
    """
    Generate scenarios for different seasons and occupancies with all relevant details for CFD and energy analysis.
    """
    scenarios = []
    for season, season_data in seasons.items():
        for occupancy in occupancies:
            scenario = deepcopy(base_conditions)
            scenario["season"] = season
            
            # Ensure wall temperature is calculated
            if season == 'Winter':
                scenario["walls"]["temperature"] = season_data.get("wall_temperature", winter_min_temp + setpoint_winter / 2)
            elif season == 'Summer':
                scenario["walls"]["temperature"] = season_data.get("wall_temperature", summer_max_temp - setpoint_summer / 2)

            # Ensure air temperature is set
            scenario["air"]["temperature"] = season_data.get("outdoor_temp", 10)  # Default to 10°C if not provided

            # Set default humidity if not provided
            scenario["air"]["humidity"] = season_data.get("humidity", 50)

            # Set occupants' heat source
            scenario["occupants"] = {
                "count": occupancy,
                "heat_source": occupancy * base_conditions["occupants"]["heat_source"],
            }

            # Set ventilation parameters
            scenario["ventilation"] = {
                "airflow_rate": 0.5 * occupancy,  # Example formula
                "spread": "uniform",  # Placeholder
            }

            # Calculate heat load
            try:
                scenario["heat_load"] = calculate_heat_load(
                    scenario["walls"], scenario["windows"], scenario["air"], scenario["occupants"], scenario["ventilation"]
                )
            except TypeError as e:
                print(f"Error in heat load calculation for scenario: {scenario}")
                raise e
            
            scenarios.append(scenario)
    return scenarios

scenarios = create_scenarios(seasons, occupancies, base_conditions)

for i, scenario in enumerate(scenarios, start=1):
    print(f"Scenario {i} | Season: {scenario['season']} | Occupancy: {scenario['occupants']['count']} | Heat Load: {scenario['heat_load']:.2f} W")


Scenario 1 | Season: Winter | Occupancy: 20 | Heat Load: -44535.86 W
Scenario 2 | Season: Winter | Occupancy: 50 | Heat Load: 16379.14 W
Scenario 3 | Season: Winter | Occupancy: 80 | Heat Load: 230294.14 W
Scenario 4 | Season: Winter | Occupancy: 100 | Heat Load: 457904.14 W
Scenario 5 | Season: Winter | Occupancy: 120 | Heat Load: 753514.14 W
Scenario 6 | Season: Winter | Occupancy: 150 | Heat Load: 1324429.14 W
Scenario 7 | Season: Winter | Occupancy: 175 | Heat Load: 1917066.64 W
Scenario 8 | Season: Winter | Occupancy: 200 | Heat Load: 2615954.14 W
Scenario 9 | Season: Winter | Occupancy: 250 | Heat Load: 4332479.14 W
Scenario 10 | Season: Summer | Occupancy: 20 | Heat Load: 122340.82 W
Scenario 11 | Season: Summer | Occupancy: 50 | Heat Load: 433500.82 W
Scenario 12 | Season: Summer | Occupancy: 80 | Heat Load: 897660.82 W
Scenario 13 | Season: Summer | Occupancy: 100 | Heat Load: 1292100.82 W
Scenario 14 | Season: Summer | Occupancy: 120 | Heat Load: 1754540.82 W
Scenario 15 | Se

In [34]:
if model_path == Path(r"C:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\model"):
   
    walls_obj_path = model_path / "Walls.obj"
    windows_obj_path = model_path / "Windows.obj"

    print(f"Current Working Directory: {os.getcwd()}")
    print(f"Walls Path: {walls_obj_path}")
    print(f"Windows Path: {windows_obj_path}")
    print(f"Walls.obj exists: {walls_obj_path.exists()}")
    print(f"Windows.obj exists: {windows_obj_path.exists()}")

    if walls_obj_path.exists():
        walls_mesh = trimesh.load(walls_obj_path)
        walls_area_adjusted = (walls_mesh.area * 10e-7)/2
        print(f"Walls Area (adjusted): {walls_area_adjusted}")
    else:
        print("Walls.obj not found.")

    if windows_obj_path.exists():
        windows_mesh = trimesh.load(windows_obj_path)
        windows_area_adjusted = (windows_mesh.area * 10e-7)/2
        print(f"Windows Area (adjusted): {windows_area_adjusted}")
    else:
        print("Windows.obj not found.")


Current Working Directory: c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\script
Walls Path: c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\model\Walls.obj
Windows Path: c:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\model\Windows.obj
Walls.obj exists: True
Windows.obj exists: True
Walls Area (adjusted): 426.9605893214077
Windows Area (adjusted): 15.09754096121111


In [35]:
from concurrent.futures import ProcessPoolExecutor
from pathlib import Path
from datetime import datetime, timedelta
import json

from math import ceil

def load_mesh_area(mesh_path: Path, default_area: float = 50.0) -> float:
    """
    Load a mesh file and return its area. If the file is missing, return a default area.
    """
    if mesh_path.exists():
        try:
            mesh = trimesh.load(mesh_path)
            return mesh.area
        except Exception as e:
            print(f"Error loading mesh from {mesh_path}: {e}. Using default area: {default_area}")
    else:
        print(f"Warning: Mesh file '{mesh_path}' not found. Using default area: {default_area}")
        
    return default_area

def load_cached_areas(model_path):
    """
    Load and cache mesh areas for walls and windows.
    """
    try:
        walls_area = load_mesh_area(model_path / 'Walls.obj', default_area=100.0) * 10e-6
        windows_area = load_mesh_area(model_path / 'Windows.obj', default_area=25.0) * 10e-6
    except Exception as e:
        print(f"Error loading mesh areas: {e}")
        walls_area = 100.0 * 10e-6
        windows_area = 25.0 * 10e-6
    return walls_area, windows_area

def calculate_heat_balance(
    walls_obj_path: Path,
    windows_obj_path: Path,
    air: dict,
    occupancy: dict,
    ventilation: dict,
    ghi: float,
    shgc: float,
    time_hours: int,
    setpoint: float,
    outdoor_temp: float,
    season: str,
    ceiling_area: float = 50.0,
    ceiling_u_value: float = 0.3
):
    """
    Calculate heat loss (Winter) or heat gain (Summer) based on building components and seasonal conditions.
    """
    # Load wall and window areas
    walls_area = load_mesh_area(walls_obj_path, default_area=100.0) * 10e-6  # Convert mm² to m²
    windows_area = load_mesh_area(windows_obj_path, default_area=25.0) * 10e-6  # Convert mm² to m²
    print(f"Walls area: {walls_area:.2f} m²")
    print(f"Windows area: {windows_area:.2f} m²")

    # Determine delta temperature based on the season
    delta_temp = setpoint - outdoor_temp if season.lower() == "winter" else outdoor_temp - setpoint
    print(f"Delta temperature ({season}): {delta_temp:.2f} °C")

    # Heat loss/gain through walls
    walls_heat = walls_area * delta_temp * air["u_value"]
    print(f"Walls heat transfer: {walls_heat:.2f} W")

    # Heat loss/gain through windows
    windows_heat = windows_area * delta_temp * air["u_value"]
    print(f"Windows heat transfer: {windows_heat:.2f} W")

    # Heat loss/gain through ceiling
    ceiling_heat = ceiling_area * delta_temp * ceiling_u_value
    print(f"Ceiling heat transfer: {ceiling_heat:.2f} W")

    # Solar radiance heat gain (applicable only for summer)
    solar_heat = ghi * windows_area * shgc * time_hours if season.lower() == "summer" else 0
    print(f"Solar heat gain (Summer): {solar_heat:.2f} W")

    # Internal heat gain from occupants
    internal_heat = occupancy["count"] * occupancy["heat_source"]
    print(f"Internal heat gain: {internal_heat:.2f} W")

    # Heat loss/gain through ventilation
    ventilation_heat = ventilation["airflow_rate"] * air["specific_heat_capacity"] * delta_temp
    print(f"Ventilation heat transfer: {ventilation_heat:.2f} W")

    # Calculate total heat loss (winter) or heat gain (summer)
    if season.lower() == "winter":
        total_heat_loss = (
            internal_heat
            - walls_heat
            - windows_heat
            - ceiling_heat
            - ventilation_heat
        )
        print(f"Total heat loss (Winter): {total_heat_loss:.2f} W")
        return {
            "Walls Heat Loss": -walls_heat,
            "Windows Heat Loss": -windows_heat,
            "Ceiling Heat Loss": -ceiling_heat,
            "Ventilation Heat Loss": -ventilation_heat,
            "Internal Heat Gain": internal_heat,
            "Total Heat Loss (Winter)": total_heat_loss,
        }
    elif season.lower() == "summer":
        total_heat_gain = (
            walls_heat
            + windows_heat
            + ceiling_heat
            + solar_heat
            + internal_heat
            - ventilation_heat
        )
        print(f"Total heat gain (Summer): {total_heat_gain:.2f} W")
        return {
            "Walls Heat Gain": walls_heat,
            "Windows Heat Gain": windows_heat,
            "Ceiling Heat Gain": ceiling_heat,
            "Solar Heat Gain": solar_heat,
            "Internal Heat Gain": internal_heat,
            "Ventilation Heat Loss": -ventilation_heat,
            "Total Heat Gain (Summer)": total_heat_gain,
        }
    else:
        raise ValueError(f"Invalid season: {season}. Must be 'winter' or 'summer'.")

def calculate_scenario(scenario, hourly_temperatures, walls_area, windows_area):
    """
    Calculate heat load for a single scenario.
    """
    season = scenario["season"]
    air = {
        "u_value": scenario["walls"]["u_value"],
        "specific_heat_capacity": specific_heat_capacity,
        "temperature": scenario["air"]["temperature"],
        "indoor_temperature": scenario.get("indoor_temp", setpoint_summer if season == 'summer' else setpoint_winter),
    }
    occupancy = {
        "count": scenario["occupants"]["count"],
        "heat_source": scenario["occupants"]["heat_source"],
    }
    ventilation = scenario.get("ventilation", {"airflow_rate": 0.01 * occupancy["count"], "delta_t": 5.0})
    ghi = scenario.get("ghi", 500)
    shgc = scenario.get("shgc", 0.5)
    time_hours = 1

    start_time = datetime.strptime("07:00", "%H:%M")
    end_time = datetime.strptime("19:00", "%H:%M")
    current_time = start_time
    hourly_results = []

    while current_time <= end_time:
        hour = current_time.hour
        outdoor_temp = hourly_temperatures.get(hour, scenario["air"]["temperature"])
        current_occupancy = occupancy["count"] if 7 <= hour <= 19 else 0
        heat_load = calculate_heat_balance(
            walls_obj_path=None,
            windows_obj_path=None,
            air=air,
            occupancy={"count": current_occupancy, "heat_source": occupancy["heat_source"]},
            ventilation=ventilation,
            ghi=ghi,
            shgc=shgc,
            time_hours=time_hours,
            setpoint=air["indoor_temperature"],
            outdoor_temp=outdoor_temp,
            season=season,
            ceiling_area=50.0,
            ceiling_u_value=0.3,
            walls_area=walls_area,
            windows_area=windows_area,
        )
        hourly_results.append({
            "hour": current_time.strftime("%H:%M"),
            "outdoor_temp": outdoor_temp,
            "occupancy": current_occupancy,
            "heat_load": heat_load,
        })
        current_time += timedelta(hours=1)

    total_heat_load = sum(
        hr["heat_load"]["Total Heat Loss (Winter)" if season.lower() == "winter" else "Total Heat Gain (Summer)"]
        for hr in hourly_results
    )

    return {
        "season": season,
        "occupancy": occupancy["count"],
        "walls_u_value": air["u_value"],
        "windows_u_value": scenario["windows"]["u_value"],
        "ghi": ghi,
        "shgc": shgc,
        "hourly_results": hourly_results,
        "total_heat_load": total_heat_load,
    }

def calculate_scenario(scenario, hourly_temperatures, walls_area, windows_area):
    """
    Calculate heat load for a single scenario.
    """
    season = scenario["season"]
    air = {
        "u_value": scenario["walls"]["u_value"],
        "specific_heat_capacity": specific_heat_capacity,
        "temperature": scenario["air"]["temperature"],
        "indoor_temperature": scenario.get("indoor_temp", setpoint_summer if season == 'summer' else setpoint_winter),
    }
    occupancy = {
        "count": scenario["occupants"]["count"],
        "heat_source": scenario["occupants"]["heat_source"],
    }
    ventilation = scenario.get("ventilation", {"airflow_rate": 0.01 * occupancy["count"], "delta_t": 5.0})
    ghi = scenario.get("ghi", 500)
    shgc = scenario.get("shgc", 0.5)
    time_hours = 1

    start_time = datetime.strptime("07:00", "%H:%M")
    end_time = datetime.strptime("19:00", "%H:%M")
    current_time = start_time
    hourly_results = []

    while current_time <= end_time:
        hour = current_time.hour
        outdoor_temp = hourly_temperatures.get(hour, scenario["air"]["temperature"])
        current_occupancy = occupancy["count"] if 7 <= hour <= 19 else 0
        heat_load = calculate_heat_balance(
            walls_obj_path=None,
            windows_obj_path=None,
            air=air,
            occupancy={"count": current_occupancy, "heat_source": occupancy["heat_source"]},
            ventilation=ventilation,
            ghi=ghi,
            shgc=shgc,
            time_hours=time_hours,
            setpoint=air["indoor_temperature"],
            outdoor_temp=outdoor_temp,
            season=season,
            ceiling_area=50.0,
            ceiling_u_value=0.3,
            walls_area=walls_area,
            windows_area=windows_area,
        )
        hourly_results.append({
            "hour": current_time.strftime("%H:%M"),
            "outdoor_temp": outdoor_temp,
            "occupancy": current_occupancy,
            "heat_load": heat_load,
        })
        current_time += timedelta(hours=1)

    total_heat_load = sum(
        hr["heat_load"]["Total Heat Loss (Winter)" if season.lower() == "winter" else "Total Heat Gain (Summer)"]
        for hr in hourly_results
    )

    return {
        "season": season,
        "occupancy": occupancy["count"],
        "walls_u_value": air["u_value"],
        "windows_u_value": scenario["windows"]["u_value"],
        "ghi": ghi,
        "shgc": shgc,
        "hourly_results": hourly_results,
        "total_heat_load": total_heat_load,
    }

def calculate_all_scenarios_batched(scenarios, hourly_temperatures, model_path, batch_size=5):
    """
    Split scenarios into batches and process each batch.
    """
    walls_area, windows_area = load_cached_areas(model_path)
    total_batches = ceil(len(scenarios) / batch_size)
    results = []

    for batch_num in range(total_batches):
        batch = scenarios[batch_num * batch_size: (batch_num + 1) * batch_size]
        print(f"Processing batch {batch_num + 1} of {total_batches}...")

        with ProcessPoolExecutor() as executor:
            tasks = [
                executor.submit(calculate_scenario, scenario, hourly_temperatures, walls_area, windows_area)
                for scenario in batch
            ]
            results.extend([task.result() for task in tasks])

        # Save intermediate results after each batch
        with open(f"results_batch_{batch_num + 1}.json", "w") as file:
            json.dump(results, file)

    return results


In [36]:
from pathlib import Path
from datetime import datetime, timedelta
import numpy as np
from tqdm import tqdm

# Precompute constants
def load_mesh_area(mesh_path: Path, default_area: float = 50.0) -> float:
    """
    Load a mesh file and return its area. If the file is missing, return a default area.
    """
    if mesh_path.exists():
        try:
            mesh = trimesh.load(mesh_path)
            return mesh.area
        except Exception as e:
            print(f"Error loading mesh from {mesh_path}: {e}. Using default area: {default_area}")
    else:
        print(f"Warning: Mesh file '{mesh_path}' not found. Using default area: {default_area}")
    return default_area

def calculate_heat_balance_precomputed(
    walls_heat_coeff, windows_heat_coeff, ceiling_area, ceiling_u_value, 
    ventilation_coeff, ghi, shgc, time_hours, setpoint, outdoor_temp, season):
    """
    Calculate heat loss (Winter) or heat gain (Summer) based on precomputed coefficients.
    """
    # Temperature difference
    delta_temp = setpoint - outdoor_temp if season.lower() == "winter" else outdoor_temp - setpoint

    # Heat transfer through components
    walls_heat = walls_heat_coeff * delta_temp
    windows_heat = windows_heat_coeff * delta_temp
    ceiling_heat = ceiling_area * ceiling_u_value * delta_temp
    ventilation_heat = ventilation_coeff * delta_temp
    solar_heat = ghi * windows_heat_coeff * shgc * time_hours if season.lower() == "summer" else 0

    # Total heat load
    if season.lower() == "winter":
        total_heat_load = -walls_heat - windows_heat - ceiling_heat - ventilation_heat
    else:  # Summer
        total_heat_load = walls_heat + windows_heat + ceiling_heat + solar_heat - ventilation_heat

    return {
        "Walls Heat": walls_heat,
        "Windows Heat": windows_heat,
        "Ceiling Heat": ceiling_heat,
        "Ventilation Heat": ventilation_heat,
        "Solar Heat": solar_heat,
        "Total Heat Load": total_heat_load,
    }

def interpolate_hourly(results, start, end):
    """
    Interpolates heat loads for missing hours.
    """
    interpolated = []
    for hour in range(start, end + 1):
        existing = next((res for res in results if int(res["hour"].split(":")[0]) == hour), None)
        if existing:
            interpolated.append(existing)
        else:
            # Simple linear interpolation
            prev = next(res for res in reversed(results) if int(res["hour"].split(":")[0]) < hour)
            next_res = next(res for res in results if int(res["hour"].split(":")[0]) > hour)
            fraction = (hour - int(prev["hour"].split(":")[0])) / (
                int(next_res["hour"].split(":")[0]) - int(prev["hour"].split(":")[0])
            )
            interpolated_value = {
                key: prev[key] + fraction * (next_res[key] - prev[key]) if isinstance(prev[key], (int, float)) else prev[key]
                for key in prev
            }
            interpolated.append(interpolated_value)
    return interpolated

def calculate_hourly_sampled(
    scenario, representative_hours, hourly_temperatures, walls_area, windows_area):
    """
    Calculate heat load for a single scenario using representative hours.
    """
    season = scenario["season"]
    air = {
        "u_value": scenario["walls"]["u_value"],
        "specific_heat_capacity": scenario["air"]["specific_heat_capacity"],
        "indoor_temperature": scenario.get("indoor_temp", 20.0 if season == 'winter' else 24.0),
    }
    occupancy = {
        "count": scenario["occupants"]["count"],
        "heat_source": scenario["occupants"]["heat_source"],
    }
    ventilation = scenario.get("ventilation", {"airflow_rate": 0.01 * occupancy["count"], "delta_t": 5.0})
    ghi = scenario.get("ghi", 500)
    shgc = scenario.get("shgc", 0.5)

    # Precompute heat coefficients
    walls_heat_coeff = walls_area * air["u_value"]
    windows_heat_coeff = windows_area * air["u_value"]
    ventilation_coeff = ventilation["airflow_rate"] * air["specific_heat_capacity"]

    results = []
    for hour in representative_hours:
        outdoor_temp = hourly_temperatures.get(hour, scenario["air"]["temperature"])
        current_occupancy = occupancy["count"] if 7 <= hour <= 19 else 0
        heat_load = calculate_heat_balance_precomputed(
            walls_heat_coeff, windows_heat_coeff, 50.0, 0.3, ventilation_coeff,
            ghi, shgc, 1, air["indoor_temperature"], outdoor_temp, season,
        )
        results.append({
            "hour": f"{hour:02d}:00",  # Format hour as "HH:MM"
            "outdoor_temp": outdoor_temp,
            "occupancy": current_occupancy,
            "heat_load": heat_load,
        })

    # Interpolate missing hours
    interpolated_results = interpolate_hourly(results, 7, 19)
    return interpolated_results

def calculate_all_scenarios(scenarios, hourly_temperatures, model_path):
    """
    Process all scenarios with optimizations for speed.
    """
    # Preload areas
    walls_area = load_mesh_area(model_path / 'Walls.obj', default_area=100.0) * 1e-6
    windows_area = load_mesh_area(model_path / 'Windows.obj', default_area=25.0) * 1e-6

    representative_hours = [7, 12, 19]  # Reduced hours for sampling
    results = []
    for scenario in tqdm(scenarios, desc="Processing Scenarios"):
        scenario_result = calculate_hourly_sampled(
            scenario, representative_hours, hourly_temperatures, walls_area, windows_area)
        results.append(scenario_result)
    return results

model_path = Path(r"C:\Users\oscar\OneDrive - Fondazione Bruno Kessler\sensor_positioning\model")
scenarios = [
    {
        "season": "winter",
        "walls": {"u_value": 0.3},
        "windows": {"u_value": 1.2},
        "air": {"specific_heat_capacity": 1005, "temperature": -5.0},
        "occupants": {"count": 4, "heat_source": 100},
        "ventilation": {"airflow_rate": 0.01, "delta_t": 5.0},
        "ghi": 200,
        "shgc": 0.5,
    },
    # Add more scenarios as needed
]
hourly_temperatures = {7: -5.0, 12: -2.0, 19: -6.0}

# Run calculations
results = calculate_all_scenarios(scenarios, hourly_temperatures, model_path)



Processing Scenarios: 100%|██████████| 1/1 [00:00<00:00, 2046.00it/s]
